1 // rtmp.cpp: Adobe/Macromedia Real Time Message Protocol handler, for Gnash.
2 //
3 // Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
4 // Free Software Foundation, Inc
5 //
6 // This program is free software; you can redistribute it and/or modify
7 // it under the terms of the GNU General Public License as published by
8 // the Free Software Foundation; either version 3 of the License, or
9 // (at your option) any later version.
10 //
11 // This program is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 // GNU General Public License for more details.
15 //
16 // You should have received a copy of the GNU General Public License
17 // along with this program; if not, write to the Free Software
18 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 //
20
21 #ifdef HAVE_CONFIG_H
22 #include "gnashconfig.h"
23 #endif
24
25 #include <iostream>
26 #include <string>
27 #include <map>
28 #include <vector>
29 #include <cerrno>
30 #include <boost/detail/endian.hpp>
31 #include <boost/format.hpp>
32
33 #if ! (defined(_WIN32) || defined(WIN32))
34 # include <netinet/in.h>
35 #endif
36
37 #include "log.h"
38 #include "amf.h"
39 #include "rtmp.h"
40 #include "cque.h"
41 #include "network.h"
42 #include "element.h"
43 #include "utility.h"
44 #include "buffer.h"
45 #include "GnashSleep.h"
46
47 using std::cerr;
48
49 namespace gnash
50 {
51
52 namespace {
53
54 /// Function object for matching C strings alphabetically.
55 class MatchFirst
56 {
57 public:
MatchFirst(const char * match)58 MatchFirst(const char* match) : _match(match) {}
59
operator ()(const RTMP::AMFProperties::value_type & a)60 bool operator()(const RTMP::AMFProperties::value_type& a) {
61 return std::strcmp(a.first, _match) == 0;
62 }
63 private:
64 const char* _match;
65 };
66
67 }
68
69
70 CQue incoming;
71
72
73 // extern std::map<int, Handler *> handlers;
74
75 const char *content_str[] = {
76 "None",
77 "Chunk Size",
78 "Abort",
79 "Bytes Read",
80 "User",
81 "Window Size",
82 "Set Bandwidth",
83 "Route",
84 "Audio Data",
85 "Video Data",
86 "Shared Object",
87 "Blank 0xb",
88 "Blank 0xc",
89 "Blank 0xd",
90 "Blank 0xe",
91 "AMF3 Notify",
92 "AMF3 Shared Object",
93 "AMF3_INVOKE",
94 "Notify",
95 "Blank 0x13",
96 "Invoke",
97 "Blank 0x15",
98 "FLV Data"
99 };
100
101 const char *ping_str[] = {
102 "PING_CLEAR",
103 "PING_PLAY",
104 "Unknown Ping 2",
105 "PING_TIME",
106 "PING_RESET",
107 "Unknown Ping 2",
108 "PING_CLIENT",
109 "PONG_CLIENT"
110 };
111
112 const char *status_str[] = {
113 "APP_GC",
114 "APP_RESOURCE_LOWMEMORY",
115 "APP_SCRIPT_ERROR",
116 "APP_SCRIPT_WARNING",
117 "APP_SHUTDOWN",
118 "NC_CALL_BADVERSION",
119 "NC_CALL_FAILED",
120 "NC_CONNECT_APPSHUTDOWN",
121 "NC_CONNECT_CLOSED",
122 "NC_CONNECT_FAILED",
123 "NC_CONNECT_INVALID_APPLICATION",
124 "NC_CONNECT_REJECTED",
125 "NC_CONNECT_SUCCESS",
126 "NS_CLEAR_FAILED",
127 "NS_CLEAR_SUCCESS",
128 "NS_DATA_START",
129 "NS_FAILED",
130 "NS_INVALID_ARGUMENT",
131 "NS_PAUSE_NOTIFY",
132 "NS_PLAY_COMPLETE",
133 "NS_PLAY_FAILED",
134 "NS_PLAY_FILE_STRUCTURE_INVALID",
135 "NS_PLAY_INSUFFICIENT_BW",
136 "NS_PLAY_NO_SUPPORTED_TRACK_FOUND",
137 "NS_PLAY_PUBLISHNOTIFY",
138 "NS_PLAY_RESET",
139 "NS_PLAY_START",
140 "NS_PLAY_STOP",
141 "NS_PLAY_STREAMNOTFOUND",
142 "NS_PLAY_SWITCH",
143 "NS_PLAY_UNPUBLISHNOTIFY",
144 "NS_PUBLISH_BADNAME",
145 "NS_PUBLISH_START",
146 "NS_RECORD_FAILED",
147 "NS_RECORD_NOACCESS",
148 "NS_RECORD_START",
149 "NS_RECORD_STOP",
150 "NS_SEEK_FAILED",
151 "NS_SEEK_NOTIFY",
152 "NS_UNPAUSE_NOTIFY",
153 "NS_UNPUBLISHED_SUCCESS",
154 "SO_CREATION_FAILED",
155 "SO_NO_READ_ACCESS",
156 "SO_NO_WRITE_ACCESS",
157 "SO_PERSISTENCE_MISMATCH"
158 };
159
160
161 // These are the textual responses
162 const char *response_str[] = {
163 "/onStatus",
164 "/onResult",
165 "/onDebugEvents"
166 };
167
168 int
headerSize(std::uint8_t header)169 RTMP::headerSize(std::uint8_t header)
170 {
171 // GNASH_REPORT_FUNCTION;
172 int headersize = header & RTMP_HEADSIZE_MASK;
173
174 if (headersize == 0) {
175 headersize = 12;
176 } else if (headersize == 0x80) {
177 headersize = 4;
178 } else if (headersize == 0x40) {
179 headersize = 8;
180 } else if (headersize == 0xc0) {
181 headersize = 1;
182 } else {
183 log_error(_("AMF Header size bits (0x%X) out of range"),
184 header & RTMP_HEADSIZE_MASK);
185 headersize = 1;
186 };
187
188 return headersize;
189 }
190
RTMP()191 RTMP::RTMP()
192 : _handshake(nullptr),
193 _packet_size(0),
194 _mystery_word(0),
195 _timeout(1)
196 {
197 // GNASH_REPORT_FUNCTION;
198
199 _bodysize.resize(MAX_AMF_INDEXES);
200 _type.resize(MAX_AMF_INDEXES);
201
202 // Initialize all of the queues
203 for (int i=0; i<MAX_AMF_INDEXES; i++) {
204 // Name is only used for debugging
205 boost::format fmt("channel #%s");
206 std::string name = (fmt % i).str();
207 _queues[i].setName(name.c_str());
208
209 // each channel can have a different chunksize
210 _chunksize[i] = RTMP_VIDEO_PACKET_SIZE;
211 _lastsize[i] = 0;
212 _bodysize[i] = 0;
213 _type[i] = RTMP::NONE;
214 }
215 }
216
~RTMP()217 RTMP::~RTMP()
218 {
219 // GNASH_REPORT_FUNCTION;
220 _properties.clear();
221 delete _handshake;
222 // delete _handler;
223
224 // delete _body;
225 }
226
227 void
addProperty(cygnal::Element & el)228 RTMP::addProperty(cygnal::Element &el)
229 {
230 // GNASH_REPORT_FUNCTION;
231 _properties[el.getName()] = el;
232 }
233
234 void
addProperty(char * name,cygnal::Element & el)235 RTMP::addProperty(char *name, cygnal::Element &el)
236 {
237 // GNASH_REPORT_FUNCTION;
238 _properties[name] = el;
239 }
240
241
242 cygnal::Element &
getProperty(const std::string & name)243 RTMP::getProperty(const std::string &name)
244 {
245 // Find without inserting.
246 AMFProperties::iterator it = std::find_if(_properties.begin(),
247 _properties.end(), MatchFirst(name.c_str()));
248
249 // If this fails and we return, it will corrupt memory, so either
250 // the assertion never fails or we'll have to return a pointer.
251 assert(it != _properties.end());
252 return it->second;
253 }
254
255 std::shared_ptr<RTMP::rtmp_head_t>
decodeHeader(cygnal::Buffer & buf)256 RTMP::decodeHeader(cygnal::Buffer &buf)
257 {
258 // GNASH_REPORT_FUNCTION;
259 return decodeHeader(buf.reference());
260 }
261
262 std::shared_ptr<RTMP::rtmp_head_t>
decodeHeader(std::uint8_t * in)263 RTMP::decodeHeader(std::uint8_t *in)
264 {
265 // GNASH_REPORT_FUNCTION;
266
267 std::shared_ptr<RTMP::rtmp_head_t> head(new RTMP::rtmp_head_t);
268 std::uint8_t *tmpptr = in;
269
270 head->channel = *tmpptr & RTMP_INDEX_MASK;
271 // log_network (_("The AMF channel index is %d"), head->channel);
272
273 head->head_size = headerSize(*tmpptr++);
274 // log_network (_("The header size is %d"), head->head_size);
275
276 // cerr << "FIXME(" << __FUNCTION__ << "): " << hexify(in,
277 // head->head_size, false) << endl;
278
279 // Make sure the header size is in range, it has to be between
280 // 1-12 bytes.
281 if (head->head_size > RTMP_MAX_HEADER_SIZE) {
282 log_error(_("RTMP Header size can't be more then %d bytes!!"),
283 RTMP_MAX_HEADER_SIZE);
284 head.reset();
285 return head;
286 } else if (head->head_size == 0) {
287 log_error(_("RTMP Header size can't be zero!"));
288 head.reset();
289 return head;
290 }
291
292 if (head->head_size >= 4) {
293 _mystery_word = *tmpptr++;
294 _mystery_word = (_mystery_word << 8) + *tmpptr++;
295 _mystery_word = (_mystery_word << 8) + *tmpptr++;
296 // log_network(_("The mystery word is: %d"), _mystery_word);
297 } else {
298 _mystery_word = 0;
299 }
300
301 if (head->head_size >= 8) {
302 head->bodysize = *tmpptr++;
303 head->bodysize = (head->bodysize << 8) + *tmpptr++;
304 head->bodysize = (head->bodysize << 8) + *tmpptr++;
305 head->bodysize = head->bodysize & 0xffffff;
306 _bodysize[head->channel] = head->bodysize;
307 log_network(_("The body size is: %d"), head->bodysize);
308 } else {
309 // If the body size is zero, we reuse the last body size field
310 // from the previous message, 1 and 4 bytes headers all
311 // reuse the previous body size.
312 head->bodysize = _bodysize[head->channel];
313 if (head->bodysize) {
314 log_network(_("Using previous body size of %d for channel %d"),
315 head->bodysize, head->channel);
316 } else {
317 log_error(_("Previous body size for channel %d is zero!"),
318 head->channel);
319 head.reset();
320 return head;
321 }
322 }
323
324 // the bodysize is limited to two bytes, so if we think we have
325 // more than that, something probably screwed up.
326 if (head->bodysize > 65535) {
327 log_error(_("Suspicious large RTMP packet body size! %d"),
328 head->bodysize);
329 head.reset();
330 return head;
331 }
332
333 if (head->head_size >= 8) {
334 std::uint8_t byte = *tmpptr;
335 head->type = (content_types_e)byte;
336 _type[head->channel] = head->type;
337 tmpptr++;
338 #if 0
339 if (head->type <= RTMP::INVOKE ) {
340 log_network(_("The type is: %s"), content_str[head->type]);
341 } else {
342 log_network(_("The type is: 0x%x"), head->type);
343 }
344 #endif
345 } else {
346 if (_type[head->channel] <= RTMP::FLV_DATA) {
347 log_network(_("Using previous type of %d for channel %d"),
348 head->type, head->channel);
349 head->type = _type[head->channel];
350 }
351 }
352
353 if (head->head_size == 12) {
354 head->src_dest = *(reinterpret_cast<RTMPMsg::rtmp_source_e *>(tmpptr));
355 tmpptr += sizeof(unsigned int);
356 // log_network(_("The source/destination is: %x"), head->src_dest);
357 }
358
359 log_network(_("RTMP %s: channel: %d, head size %d, body size: %d"),
360 ((head->head_size == 1) ? "same" : content_str[head->type]),
361 head->channel,
362 head->head_size,
363 head->bodysize);
364
365 return head;
366 }
367
368 /// \brief \ Each RTMP header consists of the following:
369 ///
370 /// * Index & header size - The header size and amf channel index.
371 /// * Total size - The total size of the message
372 /// * Type - The type of the message
373 /// * Routing - The source/destination of the message
374 //
375
376 std::shared_ptr<cygnal::Buffer>
encodeHeader(int amf_index,rtmp_headersize_e head_size)377 RTMP::encodeHeader(int amf_index, rtmp_headersize_e head_size)
378 {
379 // GNASH_REPORT_FUNCTION;
380 std::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(1));
381 buf->clear();
382 std::uint8_t *ptr = buf->reference();
383
384 // Make the channel index & header size byte
385 *ptr = head_size & RTMP_HEADSIZE_MASK;
386 *ptr += amf_index & RTMP_INDEX_MASK;
387
388 return buf;
389 }
390
391 // There are 3 size of RTMP headers, 1, 4, 8, and 12.
392 std::shared_ptr<cygnal::Buffer>
encodeHeader(int amf_index,rtmp_headersize_e head_size,size_t total_size,content_types_e type,RTMPMsg::rtmp_source_e routing)393 RTMP::encodeHeader(int amf_index, rtmp_headersize_e head_size,
394 size_t total_size, content_types_e type,
395 RTMPMsg::rtmp_source_e routing)
396 {
397 // GNASH_REPORT_FUNCTION;
398
399 std::shared_ptr<cygnal::Buffer> buf;
400 switch(head_size) {
401 case HEADER_1:
402 buf.reset(new cygnal::Buffer(1));
403 break;
404 case HEADER_4:
405 buf.reset(new cygnal::Buffer(4));
406 break;
407 case HEADER_8:
408 buf.reset(new cygnal::Buffer(8));
409 break;
410 case HEADER_12:
411 buf.reset(new cygnal::Buffer(12));
412 break;
413 }
414 std::uint8_t *ptr = buf->reference();
415
416 // Make the channel index & header size byte
417 // *ptr = head_size & RTMP_HEADSIZE_MASK;
418 *ptr = head_size; // & RTMP_INDEX_MASK;
419 *ptr += amf_index & RTMP_INDEX_MASK;
420 ptr++;
421
422 // Add the unknown bytes. These seem to be used by video and
423 // audio, and only when the header size is 4 or more.
424 if ((head_size == HEADER_4) || (head_size == HEADER_8) || (head_size == HEADER_12)) {
425 memset(ptr, 0, 3);
426 ptr += 3;
427 }
428
429 // Add the size of the message if the header size is 8 or more.
430 // and add the type of the object if the header size is 8 or more.
431 // length is a 3 byte field
432 if ((head_size == HEADER_8) || (head_size == HEADER_12)) {
433 *ptr++ = (total_size >> 16) & 0xff;
434 *ptr++ = (total_size >> 8) & 0xff;
435 *ptr++ = total_size & 0xff;
436 // The type is a one byte field
437 *ptr = type;
438 ptr++;
439
440 // Add the routing of the message if the header size is 12, the maximum.
441 if (head_size == HEADER_12 && type != RTMP::USER) {
442 if (type != RTMP::AUDIO_DATA && type != RTMP::VIDEO_DATA) {
443 // log_network(_("The routing is: 0x%x"), routing);
444 std::uint32_t swapped = htonl(routing);
445 memcpy(ptr, &swapped, 4);
446 } else {
447 // FIXME: I have no idea why these two empty messages
448 // don't handle the routing field for 12 byte headers
449 // the same as all the other types.
450 std::uint8_t swapped = 0x1;
451 *ptr = swapped;
452 }
453 ptr += 4;
454 }
455 }
456
457 // Manually adjust the seek pointer since we added the data by
458 // walking our own temporary pointer, so none of the regular ways
459 // of setting the seek pointer are appropriate.
460 buf->setSeekPointer(buf->reference() + buf->size());
461
462 return buf;
463 }
464
465 #if 0
466 bool
467 RTMP::packetRead(cygnal::Buffer &buf)
468 {
469 GNASH_REPORT_FUNCTION;
470
471 // int packetsize = 0;
472 size_t amf_index, headersize;
473 std::uint8_t *ptr = buf.reference();
474 std::uint8_t *tooFar = ptr+buf.size();
475 AMF amf;
476
477 ptr += 1; // skip past the RTMP header byte
478
479 amf_index = *ptr & RTMP_INDEX_MASK;
480 headersize = headerSize(*buf.reference());
481 log_network(_("The Header size is: %d"), headersize);
482 log_network(_("The AMF index is: 0x%x"), amf_index);
483
484 if (headersize > 1) {
485 RTMP::rtmp_head_t *rthead = decodeHeader(ptr);
486 // if (packetsize) {
487 // log_network (_("Read first RTMP packet header of size %d"), packetsize);
488 // } else {
489 // log_error (_("Couldn't read first RTMP packet header"));
490 // return false;
491 // }
492 }
493
494 #if 1
495 std::uint8_t *end = buf.remove(0xc3);
496 #else
497 std::uint8_t *end = buf.find(0xc3);
498 log_network(_("END is %x"), (void *)end);
499 *end = '*';
500 #endif
501
502 // ptr = decodeHeader(ptr);
503 // ptr += headersize;
504
505 std::shared_ptr<cygnal::Element> el = amf.extractAMF(ptr, tooFar);
506 // el->dump();
507 el = amf.extractAMF(ptr, tooFar); // @@strk@@ : what's the +1 for ?
508 // el->dump();
509 log_network(_("Reading AMF packets till we're done..."));
510 // buf->dump();
511 while (ptr < end) {
512 std::shared_ptr<cygnal::Element> el = amf.extractProperty(ptr, tooFar);
513 addProperty(el);
514 // el->dump();
515 }
516 ptr += 1;
517 size_t actual_size = static_cast<size_t>(_header.bodysize - AMF_HEADER_SIZE);
518 log_network(_("Total size in header is %d, buffer size is: %d"),
519 _header.bodysize, buf.size());
520 // buf->dump();
521 if (buf.size() < actual_size) {
522 log_network(_("FIXME: MERGING"));
523 // buf = _handler->merge(buf); FIXME needs to use shared_ptr
524 }
525 while ((ptr - buf.begin()) < static_cast<int>(actual_size)) {
526 std::shared_ptr<cygnal::Element> el = amf.extractProperty(ptr, tooFar);
527 addProperty(el);
528 // el->dump(); // FIXME: dump the AMF objects as they are read in
529 }
530
531 // dump();
532
533 std::shared_ptr<cygnal::Element> url = getProperty("tcUrl");
534 std::shared_ptr<cygnal::Element> file = getProperty("swfUrl");
535 std::shared_ptr<cygnal::Element> app = getProperty("app");
536
537 if (file) {
538 log_network(_("SWF file %s"), file->to_string());
539 }
540 if (url) {
541 log_network(_("is Loading video %s"), url->to_string());
542 }
543 if (app) {
544 log_network(_("is file name is %s"), app->to_string());
545 }
546
547 return true;
548 }
549 #endif
550
551 void
dump()552 RTMP::dump()
553 {
554 cerr << "RTMP packet contains " << _properties.size() << " variables."
555 << std::endl;
556 AMFProperties::iterator it;
557 for (it = _properties.begin(); it != _properties.end(); ++it) {
558 // const char *name = it->first;
559 cygnal::Element el = it->second;
560 el.dump();
561 }
562 }
563
564 // A Ping packet has two parameters that are always specified, and 2
565 // that are optional. The first two bytes are the ping type, as in
566 // rtmp_ping_e, the second is the ping target, which is always zero as
567 // far as we can tell.
568 //
569 // More notes from: http://jira.red5.org/confluence/display/docs/Ping
570 // type 0: Clear the stream. No third and fourth parameters. The
571 // second parameter could be 0. After the connection is established, a
572 // Ping 0,0 will be sent from server to client. The message will also
573 // be sent to client on the start of Play and in response of a Seek or
574 // Pause/Resume request. This Ping tells client to re-calibrate the
575 // clock with the timestamp of the next packet server sends.
576 //
577 // type 1: Tell the stream to clear the playing buffer.
578 // type 3: Buffer time of the client. The third parameter is the
579 // buffer time in millisecond.
580 // type 4: Reset a stream. Used together with type 0 in the case of
581 // VOD. Often sent before type 0.
582 // type 6: Ping the client from server. The second parameter is the
583 // current time.
584 // type 7: Pong reply from client. The second parameter is the time
585 // the server sent with his ping request.
586
587 // A RTMP Ping packet looks like this: "02 00 00 00 00 00 06 04 00 00
588 // 00 00 00 00 00 00 00 0", which is the Ping type byte, followed by
589 // two shorts that are the parameters. Only the first two paramters
590 // are required. This seems to be a ping message, 12 byte header,
591 // system channel 2
592 // 02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 00
593 std::shared_ptr<RTMP::rtmp_ping_t>
decodePing(std::uint8_t * data)594 RTMP::decodePing(std::uint8_t *data)
595 {
596 // GNASH_REPORT_FUNCTION;
597
598 std::uint8_t *ptr = reinterpret_cast<std::uint8_t *>(data);
599 std::shared_ptr<rtmp_ping_t> ping(new rtmp_ping_t);
600
601 // All the data fields in a ping message are 2 bytes long.
602 std::uint16_t type = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
603 ping->type = static_cast<rtmp_ping_e>(type);
604 ptr += sizeof(std::uint16_t);
605
606 ping->target = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
607 ptr += sizeof(std::uint16_t);
608
609 ping->param1 = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
610 ptr += sizeof(std::uint16_t);
611
612 // ping->param2 = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
613 // ptr += sizeof(std::uint16_t);
614
615 // ping->param3 = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
616 ping->param3 = 0;
617
618 return ping;
619 }
620 std::shared_ptr<RTMP::rtmp_ping_t>
decodePing(cygnal::Buffer & buf)621 RTMP::decodePing(cygnal::Buffer &buf)
622 {
623 // GNASH_REPORT_FUNCTION;
624 return decodePing(buf.reference());
625 }
626
627 std::shared_ptr<RTMP::user_event_t>
decodeUserControl(cygnal::Buffer & buf)628 RTMP::decodeUserControl(cygnal::Buffer &buf)
629 {
630 // GNASH_REPORT_FUNCTION;
631 return decodeUserControl(buf.reference());
632 }
633
634 std::shared_ptr<RTMP::user_event_t>
decodeUserControl(std::uint8_t * data)635 RTMP::decodeUserControl(std::uint8_t *data)
636 {
637 // GNASH_REPORT_FUNCTION;
638
639 std::uint8_t *ptr = reinterpret_cast<std::uint8_t *>(data);
640 std::shared_ptr<user_event_t> user(new RTMP::user_event_t);
641
642 std::uint16_t type = ntohs(*reinterpret_cast<std::uint16_t *>(ptr));
643 std::uint16_t eventid = static_cast<user_control_e>(type);
644 ptr += sizeof(std::uint16_t);
645
646 std::uint32_t param1 = ntohl(*reinterpret_cast<std::uint32_t *>(ptr));
647 ptr += sizeof(std::uint32_t);
648
649 user->type = static_cast<user_control_e>(type);
650 user->param1 = param1;
651 user->param2 = 0;
652
653 // All events have only 4 bytes of data, except Set Buffer, which
654 // uses 8 bytes. The 4 bytes is usually the Stream ID except for
655 // Ping and Pong events, which carry a time stamp instead. We
656 // don't actually do anything here, we just parse the data.
657 switch (eventid) {
658 case STREAM_START:
659 case STREAM_EOF:
660 case STREAM_NODATA:
661 case STREAM_BUFFER:
662 {
663 std::uint32_t param2 = ntohl(*reinterpret_cast<std::uint32_t *>(ptr));
664 ptr += sizeof(std::uint32_t);
665 user->param2 = param2;
666 break;
667 }
668 case STREAM_LIVE:
669 case STREAM_PING:
670 case STREAM_PONG:
671 break;
672 default:
673 log_unimpl(_("Unknown User Control message %d!"), 1);
674 break;
675 };
676
677 return user;
678 }
679
680 // Stream Live -
681 // 02 00 00 00 00 00 06 04 00 00 00 00 00 04 00 00 00 01
682 // Stream Start -
683 // 02 00 00 00 00 00 06 04 00 00 00 00 00 00 00 00 00 01
684 std::shared_ptr<cygnal::Buffer>
encodeUserControl(user_control_e eventid,std::uint32_t data)685 RTMP::encodeUserControl(user_control_e eventid, std::uint32_t data)
686 {
687 // GNASH_REPORT_FUNCTION;
688
689 std::uint32_t swapped = 0;
690 std::shared_ptr<cygnal::Buffer> buf;
691 if (eventid == STREAM_BUFFER) {
692 buf.reset(new cygnal::Buffer(sizeof(std::uint16_t) * 5));
693 } else {
694 buf.reset(new cygnal::Buffer(sizeof(std::uint16_t) * 3));
695 }
696
697 // Set the type of this ping message
698 std::uint16_t typefield = htons(eventid);
699 *buf = typefield;
700
701 // All events have only 4 bytes of data, except Set Buffer, which
702 // uses 8 bytes. The 4 bytes is usually the Stream ID except for
703 // Ping and Pong events, which carry a time stamp instead. We
704 // don't actually do anything here, we just parse the data.
705 switch (eventid) {
706 case STREAM_START:
707 case STREAM_EOF:
708 case STREAM_NODATA:
709 swapped = data;
710 cygnal::swapBytes(&swapped, sizeof(std::uint32_t));
711 *buf += swapped;
712 break;
713 case STREAM_BUFFER:
714 buf.reset(new cygnal::Buffer(sizeof(std::uint16_t) * 5));
715 break;
716 case STREAM_LIVE:
717 case STREAM_PING:
718 case STREAM_PONG:
719 swapped = data;
720 cygnal::swapBytes(&swapped, sizeof(std::uint32_t));
721 *buf += swapped;
722 break;
723 default:
724 break;
725 };
726
727 return buf;
728 }
729
730 std::shared_ptr<RTMPMsg>
decodeMsgBody(std::uint8_t * data,size_t size)731 RTMP::decodeMsgBody(std::uint8_t *data, size_t size)
732 {
733 // GNASH_REPORT_FUNCTION;
734 cygnal::AMF amf_obj;
735 std::uint8_t *ptr = data;
736 std::uint8_t* tooFar = data + size;
737 bool status = false;
738 std::shared_ptr<RTMPMsg> msg(new RTMPMsg);
739
740 // The first data object is the method name of this object.
741 std::shared_ptr<cygnal::Element> name = amf_obj.extractAMF(ptr, tooFar);
742 if (name) {
743 ptr += name->getDataSize() + cygnal::AMF_HEADER_SIZE; // skip the length bytes too
744 } else {
745 log_error(_("Name field of RTMP Message corrupted!"));
746 msg.reset();
747 return msg;
748 }
749
750 // The stream ID is the second data object. All messages have
751 // these two objects at the minimum.
752 std::shared_ptr<cygnal::Element> streamid = amf_obj.extractAMF(ptr, tooFar);
753 if (streamid) {
754 // Most onStatus messages have the stream ID, but the Data
755 // Start onStatus message is basically just a marker that an
756 // FLV file is coming next.
757 if (streamid->getType() == cygnal::Element::NUMBER_AMF0) {
758 ptr += cygnal::AMF0_NUMBER_SIZE + 1;
759 }
760 } else {
761 log_error(_("Stream ID field of RTMP Message corrupted!"));
762 msg.reset();
763 return msg;
764 }
765
766 if (name->to_string() != nullptr) {
767 msg->setMethodName(name->to_string());
768 }
769
770 double swapped = streamid->to_number();
771 // swapBytes(&swapped, amf::AMF0_NUMBER_SIZE);
772 msg->setTransactionID(swapped);
773
774 if ((msg->getMethodName() == "_result") || (msg->getMethodName() == "_error") || (msg->getMethodName() == "onStatus")) {
775 status = true;
776 }
777
778 // Then there are a series of AMF objects, often a higher level
779 // ActionScript object with properties attached.
780 while (ptr < tooFar) {
781 // These pointers get deleted automatically when the msg
782 // object is deleted
783 std::shared_ptr<cygnal::Element> el = amf_obj.extractAMF(ptr, tooFar);
784 ptr += amf_obj.totalsize();
785 if (el == nullptr) {
786 break;
787 }
788 msg->addObject(el);
789 if (status) {
790 msg->checkStatus(el);
791 }
792 };
793
794 return msg;
795 }
796
797 std::shared_ptr<RTMPMsg>
decodeMsgBody(cygnal::Buffer & buf)798 RTMP::decodeMsgBody(cygnal::Buffer &buf)
799 {
800 // GNASH_REPORT_FUNCTION;
801 return decodeMsgBody(buf.reference(), buf.allocated());
802 }
803
804 // 02 00 00 00 00 00 04 01 00 00 00 00 00 00 10 00
805 // id=2 timestamp=0 body_size=4 content_type=0x01 dest=0
806 // Set chunk size 4096
807 std::shared_ptr<cygnal::Buffer>
encodeChunkSize(int size)808 RTMP::encodeChunkSize(int size)
809 {
810 GNASH_REPORT_FUNCTION;
811
812 std::uint32_t swapped = htonl(size);
813 std::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(sizeof(std::uint32_t)));
814 *buf += swapped;
815
816 return buf;
817 }
818
819 void
decodeChunkSize()820 RTMP::decodeChunkSize()
821 {
822 GNASH_REPORT_FUNCTION;
823 // _chunksize[rthead->channel] = ntohl(*reinterpret_cast<std::uint32_t *>(ptr + rthead->head_size));
824 // log_network("Setting packet chunk size to %d.", _chunksize);
825 log_unimpl(__PRETTY_FUNCTION__);
826 }
827
828 std::shared_ptr<cygnal::Buffer>
encodeBytesRead()829 RTMP::encodeBytesRead()
830 {
831 GNASH_REPORT_FUNCTION;
832 log_unimpl(__PRETTY_FUNCTION__);
833 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
834 }
835
836 void
decodeBytesRead()837 RTMP::decodeBytesRead()
838 {
839 GNASH_REPORT_FUNCTION;
840 log_unimpl(__PRETTY_FUNCTION__);
841 }
842
843 std::shared_ptr<cygnal::Buffer>
encodeServer()844 RTMP::encodeServer()
845 {
846 GNASH_REPORT_FUNCTION;
847 log_unimpl(__PRETTY_FUNCTION__);
848 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
849 }
850
851 void
decodeServer()852 RTMP::decodeServer()
853 {
854 GNASH_REPORT_FUNCTION;
855 log_unimpl(__PRETTY_FUNCTION__);
856 }
857
858 std::shared_ptr<cygnal::Buffer>
encodeClient()859 RTMP::encodeClient()
860 {
861 GNASH_REPORT_FUNCTION;
862 log_unimpl(__PRETTY_FUNCTION__);
863 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
864 }
865
866 void
decodeClient()867 RTMP::decodeClient()
868 {
869 GNASH_REPORT_FUNCTION;
870 log_unimpl(__PRETTY_FUNCTION__);
871 }
872
873 std::shared_ptr<cygnal::Buffer>
encodeAudioData()874 RTMP::encodeAudioData()
875 {
876 GNASH_REPORT_FUNCTION;
877 log_unimpl(__PRETTY_FUNCTION__);
878 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
879 }
880
881 void
decodeAudioData()882 RTMP::decodeAudioData()
883 {
884 GNASH_REPORT_FUNCTION;
885 log_unimpl(__PRETTY_FUNCTION__);
886 }
887
888 std::shared_ptr<cygnal::Buffer>
encodeVideoData()889 RTMP::encodeVideoData()
890 {
891 GNASH_REPORT_FUNCTION;
892 log_unimpl(__PRETTY_FUNCTION__);
893 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
894 }
895
896 void
decodeVideoData()897 RTMP::decodeVideoData()
898 {
899 GNASH_REPORT_FUNCTION;
900 log_unimpl(__PRETTY_FUNCTION__);
901 }
902
903 std::shared_ptr<cygnal::Buffer>
encodeNotify()904 RTMP::encodeNotify()
905 {
906 GNASH_REPORT_FUNCTION;
907 log_unimpl(__PRETTY_FUNCTION__);
908 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
909 }
910
911 void
decodeNotify()912 RTMP::decodeNotify()
913 {
914 GNASH_REPORT_FUNCTION;
915 log_unimpl(__PRETTY_FUNCTION__);
916 }
917
918 std::shared_ptr<cygnal::Buffer>
encodeSharedObj()919 RTMP::encodeSharedObj()
920 {
921 GNASH_REPORT_FUNCTION;
922 log_unimpl(__PRETTY_FUNCTION__);
923 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
924 }
925
926 void
decodeSharedObj()927 RTMP::decodeSharedObj()
928 {
929 GNASH_REPORT_FUNCTION;
930 log_unimpl(__PRETTY_FUNCTION__);
931 }
932
933 std::shared_ptr<cygnal::Buffer>
encodeInvoke()934 RTMP::encodeInvoke()
935 {
936 GNASH_REPORT_FUNCTION;
937 log_unimpl(__PRETTY_FUNCTION__);
938 return std::shared_ptr<cygnal::Buffer>((cygnal::Buffer*)nullptr);
939 }
940 void
decodeInvoke()941 RTMP::decodeInvoke()
942 {
943 GNASH_REPORT_FUNCTION;
944 log_unimpl(__PRETTY_FUNCTION__);
945 }
946
947 // Send a message, usually a single ActionScript object. This message
948 // may be broken down into a series of packets on a regular byte
949 // interval. The byte boundary defaults to 128 bytes (video data), but can
950 // be changed by the ChunkSize() command.
951 bool
sendMsg(int channel,rtmp_headersize_e head_size,size_t total_size,content_types_e type,RTMPMsg::rtmp_source_e routing,cygnal::Buffer & data)952 RTMP::sendMsg(int channel, rtmp_headersize_e head_size,
953 size_t total_size, content_types_e type,
954 RTMPMsg::rtmp_source_e routing, cygnal::Buffer &data)
955 {
956 // GNASH_REPORT_FUNCTION;
957 return sendMsg(getFileFd(), channel, head_size, total_size, type, routing, data.reference(), data.allocated());
958 }
959
960 bool
sendMsg(int fd,int channel,rtmp_headersize_e head_size,size_t total_size,content_types_e type,RTMPMsg::rtmp_source_e routing,cygnal::Buffer & data)961 RTMP::sendMsg(int fd, int channel, rtmp_headersize_e head_size,
962 size_t total_size, content_types_e type,
963 RTMPMsg::rtmp_source_e routing, cygnal::Buffer &data)
964 {
965 // GNASH_REPORT_FUNCTION;
966 return sendMsg(fd, channel, head_size, total_size, type, routing, data.reference(), data.allocated());
967 }
968
969
970 bool
sendMsg(int channel,rtmp_headersize_e head_size,size_t total_size,content_types_e type,RTMPMsg::rtmp_source_e routing,std::uint8_t * data,size_t size)971 RTMP::sendMsg(int channel, rtmp_headersize_e head_size,
972 size_t total_size, content_types_e type,
973 RTMPMsg::rtmp_source_e routing, std::uint8_t *data, size_t size)
974 {
975 // GNASH_REPORT_FUNCTION;
976 return sendMsg(getFileFd(), channel, head_size, total_size, type, routing, data, size);
977 }
978
979 bool
sendMsg(int fd,int channel,rtmp_headersize_e head_size,size_t total_size,content_types_e type,RTMPMsg::rtmp_source_e routing,std::uint8_t * data,size_t size)980 RTMP::sendMsg(int fd, int channel, rtmp_headersize_e head_size,
981 size_t total_size, content_types_e type,
982 RTMPMsg::rtmp_source_e routing, std::uint8_t *data, size_t size)
983 {
984 // GNASH_REPORT_FUNCTION;
985 int ret = 0;
986
987 #if 0
988 // We got some bogus parameters
989 if (total_size || size 0) {
990 log_error(_("Bogus size parameter in %s!"), __PRETTY_FUNCTION__);
991 return false;
992 }
993 #endif
994
995 // FIXME: This is a temporary hack to make it easier to read hex
996 // dumps from network packet sniffing so all the data is in one
997 // buffer. This matches the Adobe behaviour, but for Gnash/Cygnal,
998 // is a performance hit.
999
1000 // Figure out how many packets it'll take to send this data.
1001 int pkts = size/_chunksize[channel];
1002 std::shared_ptr<cygnal::Buffer> bigbuf(new cygnal::Buffer(size+pkts+100));
1003
1004 // This builds the full header, which is required as the first part
1005 // of the packet.
1006 std::shared_ptr<cygnal::Buffer> head = encodeHeader(channel, head_size,
1007 total_size, type, routing);
1008 // When more data is sent than fits in the chunksize for this
1009 // channel, it gets broken into chunksize pieces, and each piece
1010 // after the first packet is sent gets a one byte header instead.
1011 #if 0
1012 std::shared_ptr<cygnal::Buffer> cont_head = encodeHeader(channel, RTMP::HEADER_1);
1013 #else
1014 std::shared_ptr<cygnal::Buffer> cont_head(new cygnal::Buffer(1));
1015 std::uint8_t foo = 0xc3;
1016 *cont_head = foo;
1017 #endif
1018
1019 size_t partial = _chunksize[channel];
1020 size_t nbytes = 0;
1021
1022 // First send the full header, afterwards we only use continuation
1023 // headers, which are only one byte.
1024 #if 0
1025 ret = writeNet(fd, head->reference(), head->size());
1026 if (ret == -1) {
1027 log_error(_("Couldn't write the full 12 byte RTMP header!"));
1028 return false;
1029 } else {
1030 log_network(_("Wrote the full 12 byte RTMP header."));
1031 }
1032 #else
1033 *bigbuf = head;
1034 #endif
1035
1036 // if (data && size) {
1037 // now send the data
1038 while (nbytes <= size) {
1039 // The last bit of data is usually less than the packet size,
1040 // so we write less data of course.
1041 if ((size - nbytes) < _chunksize[channel]) {
1042 partial = size - nbytes;
1043 }
1044 // After the first packet, only send the single byte
1045 // continuation packet.
1046 if (nbytes > 0) {
1047 #if 0
1048 ret = writeNet(fd, *cont_head);
1049 if (ret == -1) {
1050 log_error(_("Couldn't write the full 1 byte RTMP continuation header!"));
1051 return false;
1052 } else {
1053 log_network(_("Wrote the full 1 byte RTMP continuation header"));
1054 }
1055 #else
1056 *bigbuf += cont_head;
1057 #endif
1058 }
1059 // }
1060
1061 // write the data to the client
1062 #if 0
1063 ret = writeNet(fd, data + nbytes, partial);
1064 if (ret == -1) {
1065 log_error(_("Couldn't write the RTMP body!"));
1066 return false;
1067 } else {
1068 log_network(_("Wrote %d bytes of the RTMP body, %d bytes left."),
1069 ret, size-nbytes);
1070 }
1071 #else
1072 if (data != nullptr) {
1073 bigbuf->append(data + nbytes, partial);
1074 }
1075 #endif
1076 // adjust the accumulator.
1077 nbytes += _chunksize[channel];
1078 };
1079
1080 #if 1
1081 // bigbuf->dump();
1082
1083 ret = writeNet(fd, *bigbuf);
1084 if (ret == -1) {
1085 log_error(_("Couldn't write the RTMP packet!"));
1086 return false;
1087 } else {
1088 log_network(_("Wrote the RTMP packet."));
1089 }
1090 #endif
1091
1092 return true;
1093 }
1094
1095 #if 0
1096 // Send a Msg, and expect a response back of some kind.
1097 RTMPMsg *
1098 RTMP::sendRecvMsg(cygnal::Buffer &bufin)
1099 {
1100 GNASH_REPORT_FUNCTION;
1101 // size_t total_size = buf2->size() - 6; // FIXME: why drop 6 bytes ?
1102 std::shared_ptr<cygnal::Buffer> head = encodeHeader(amf_index, head_size, total_size,
1103 type, routing);
1104 // int ret = 0;
1105 int ret = writeNet(head->reference(), head->size()); // send the header first
1106 // if (netDebug()) {
1107 // head->dump();
1108 // bufin->dump();
1109 // }
1110 ret = sendMsg(bufin);
1111
1112 RTMP::rtmp_head_t *rthead = 0;
1113 RTMPMsg *msg = 0;
1114 std::shared_ptr<cygnal::Buffer> buf;
1115 std::uint8_t *ptr = 0;
1116
1117
1118 buf = recvMsg(1); // use a 1 second timeout
1119 if (buf == 0) {
1120 return 0;
1121 }
1122 RTMP::queues_t *que = split(buf);
1123 // CQue *que = split(buf);
1124 while (que->size()) {
1125 // ptr = que->pop();
1126 cerr << "QUE SIZE: " << que->size() << endl;
1127 ptr = que->front()->pop()->reference();
1128 que->pop_front();
1129 // ptr = buf->reference();
1130 rthead = decodeHeader(ptr);
1131
1132 if (rthead) {
1133 if (rthead->head_size == 1) {
1134 log_network(_("Response header: %s"), hexify(ptr, 7, false));
1135 } else {
1136 log_network(_("Response header: %s"),
1137 hexify(ptr, rthead->head_size, false));
1138 }
1139 if (rthead->type <= RTMP::FLV_DATA) {
1140 log_error(_("Processing message of type %s!"),
1141 content_str[rthead->type]);
1142 }
1143
1144 switch (rthead->type) {
1145 case CHUNK_SIZE:
1146 log_networ(_("Got CHUNK_SIZE packet!!!"));
1147 _chunksize[rthead->channel] = ntohl(*reinterpret_cast<std::uint32_t *>(ptr + rthead->head_size));
1148 log_network(_("Setting packet chunk size to %d."), _chunksize);
1149 // decodeChunkSize();
1150 break;
1151 case BYTES_READ:
1152 log_network("Got Bytes Read packet!!!");
1153 // decodeBytesRead();
1154 break;
1155 case PING:
1156 {
1157 RTMP::rtmp_ping_t *ping = decodePing(ptr);
1158 log_network(_("FIXME: Ping type is: %d, ignored for now"),
1159 ping->type);
1160 switch (ping->type) {
1161 case PING_CLEAR:
1162 break;
1163 case PING_PLAY:
1164 break;
1165 case PING_TIME:
1166 break;
1167 case PING_RESET:
1168 break;
1169 case PING_CLIENT:
1170 break;
1171 case PONG_CLIENT:
1172 break;
1173 default:
1174 break;
1175 };
1176 break;
1177 }
1178 case SERVER:
1179 {
1180 log_network(_("Got SERVER packet!!!"));
1181 Buffer server_data(rthead->bodysize);
1182 server_data.copy(ptr + rthead->head_size, rthead->bodysize);
1183 // decodeServer();
1184 break;
1185 }
1186 case CLIENT:
1187 {
1188 log_network(_("Got CLIENT packet!!!"));
1189 Buffer client_data(rthead->bodysize);
1190 client_data.copy(ptr + rthead->head_size, rthead->bodysize);
1191 // decodeClient();
1192 break;
1193 }
1194 case VIDEO_DATA:
1195 {
1196 log_network(_("Got VIDEO packets!!!"));
1197 std::shared_ptr<cygnal::Buffer> frame;
1198 do {
1199 frame = recvMsg(1); // use a 1 second timeout
1200 if (frame) {
1201 _queues[rthead->channel].push(frame);
1202 }
1203 // decodeVideoData();
1204 } while (frame);
1205 _queues->dump();
1206 break;
1207 }
1208 case NOTIFY:
1209 // decodeNotify();
1210 break;
1211 case SHARED_OBJ:
1212 log_network(_("Got Shared Object packet!!!"));
1213 // decodeSharedObj();
1214 break;
1215 case INVOKE:
1216 msg = decodeMsgBody(ptr + rthead->head_size, rthead->bodysize);
1217 // msg->dump();
1218 if (msg) {
1219 log_network(_("%s: Msg status is: %d: %s, name is %s, size is %d"), __FUNCTION__,
1220 msg->getStatus(), status_str[msg->getStatus()],
1221 msg->getMethodName(), msg->size());
1222 if (msg->getMethodName() == "onBWDone") {
1223 log_network(_("Got onBWDone packet!!!"));
1224 continue;
1225 }
1226 return msg;
1227 } else {
1228 log_error(_("Couldn't decode message body for type %s!"),
1229 content_str[rthead->type]);
1230 }
1231 // decodeInvoke();
1232 break;
1233 case AUDIO_DATA:
1234 // decodeAudioData();
1235 break;
1236 default:
1237 break;
1238 } // end of switch
1239 }
1240 // if (_queues[rthead->channel] != 0) {
1241 // _queues[rthead->channel].push(chunk);
1242 // }
1243 // ptr += rthead->head_size + rthead->bodysize;
1244 };
1245
1246 return msg;
1247 }
1248 #endif
1249
1250 // Receive a message, which is a series of AMF elements, seperated
1251 // by a one byte header at regular byte intervals. (128 bytes for
1252 // video data by default). Each message main contain multiple packets.
1253 std::shared_ptr<cygnal::Buffer>
recvMsg()1254 RTMP::recvMsg()
1255 {
1256 // GNASH_REPORT_FUNCTION;
1257 return recvMsg(getFileFd());
1258 }
1259
1260 // Read big chunks of NETBUFSIZE, which is the default for a Buffer as it's
1261 // more efficient. As these reads may cross packet boundaries, and they may
1262 // also include the RTMP header every _chunksize bytes, this raw data will
1263 // need to be processed later on.
1264 std::shared_ptr<cygnal::Buffer>
recvMsg(int fd)1265 RTMP::recvMsg(int fd)
1266 {
1267 // GNASH_REPORT_FUNCTION;
1268
1269 int ret = 0;
1270 //bool nopacket = true;
1271
1272 // Read really big packets, they get split into the smaller ones when 'split'
1273 std::shared_ptr<cygnal::Buffer> buf(new cygnal::Buffer(3074));
1274 do {
1275 ret = readNet(fd, buf->reference()+ret, buf->size()-ret, _timeout);
1276 // We got data. Resize the buffer if necessary.
1277 if (ret > 0) {
1278 buf->setSeekPointer(buf->reference() + ret);
1279 }
1280 // the read timed out as there was no data, but the socket is still open.
1281 if (ret == 0) {
1282 log_network(_("no data for fd #%d, done reading this packet, read %d bytes..."), fd,
1283 buf->allocated());
1284 buf.reset();
1285 break;
1286 }
1287 if ((ret == 1) && (*(buf->reference()) == 0xff)) {
1288 log_network(_("Got an empty packet from the server at line %d"),
1289 __LINE__);
1290 ret = 0;
1291 buf->clear();
1292 continue;
1293 }
1294 // ret is "no position" when the socket is closed from the other end of the connection,
1295 // so we're done.
1296 if ((ret == static_cast<int>(std::string::npos)) || (ret == -1)) {
1297 log_network(_("socket for fd #%d was closed..."), fd);
1298 buf.reset();
1299 break;
1300 }
1301 } while (ret <= 0);
1302
1303 // if (netDebug()) {
1304 // buf->dump();
1305 // }
1306
1307 return buf;
1308 }
1309
1310 // Split a large buffer into multiple smaller ones of the default chunksize
1311 // of 128 bytes. We read network data in big chunks because it's more efficient,
1312 // but RTMP uses a weird scheme of a standard header, and then every chunksize
1313 // bytes another 1 byte RTMP header. The header itself is not part of the byte
1314 // count.
1315 std::shared_ptr<RTMP::queues_t>
split(cygnal::Buffer & buf)1316 RTMP::split(cygnal::Buffer &buf)
1317 {
1318 // GNASH_REPORT_FUNCTION;
1319 return split(buf.reference(), buf.allocated());
1320 }
1321
1322 std::shared_ptr<RTMP::queues_t>
split(std::uint8_t * data,size_t size)1323 RTMP::split(std::uint8_t *data, size_t size)
1324 {
1325 // GNASH_REPORT_FUNCTION;
1326
1327 if (data == nullptr) {
1328 log_error(_("Buffer pointer is invalid."));
1329 }
1330
1331 std::shared_ptr<RTMP::queues_t> channels(new RTMP::queues_t);
1332
1333 // split the buffer at the chunksize boundary
1334 std::uint8_t *ptr = nullptr;
1335 std::shared_ptr<rtmp_head_t> rthead(new rtmp_head_t);
1336 size_t pktsize = 0;
1337 //size_t nbytes = 0;
1338
1339 ptr = data;
1340 std::shared_ptr<cygnal::Buffer> chunk;
1341 // There may be multiple messages in this Buffer, so we walk a
1342 // temp pointer through the contents of the Buffer.
1343 while ((ptr - data) < static_cast<int>(size)) {
1344 // Decode the header of the packet to get the header size, the
1345 // body size, and the channel, all of which we need.
1346 rthead = decodeHeader(ptr);
1347 if (!rthead) {
1348 channels.reset();
1349 return channels;
1350 }
1351 // System channel messages are always on channel 2, and get
1352 // processed differently later on.
1353 if (rthead->channel == RTMP_SYSTEM_CHANNEL) {
1354 log_network(_("Got a message on the system channel!"), __FUNCTION__);
1355 }
1356 // If the header size is 4 bytes or less, then reuse the body size
1357 // of the last message for this channel.
1358 if (rthead->head_size <= 4) {
1359 rthead->bodysize = _lastsize[rthead->channel];
1360 }
1361 // Make sure the header size we just got is in range. We can
1362 // proceed as long as it is in range, but if it is out of
1363 // range, we can't really continue.
1364 if (rthead->head_size <= RTMP_MAX_HEADER_SIZE) {
1365 // Any packet with a header size greater than 1 is a
1366 // always a new RTMP message, so create a new cygnal::Buffer to
1367 // hold all the data.
1368 if ((rthead->head_size >= 1) || (ptr == data)) {
1369 // cerr << "New packet for channel #" << rthead->channel << " of size "
1370 // << (rthead->head_size + rthead->bodysize) << endl;
1371 // give it some memory to store data in. We store
1372 chunk.reset(new cygnal::Buffer(rthead->bodysize + rthead->head_size + 1));
1373 // Each RTMP connection has 64 channels, so we store
1374 // the header with the data so that info is accessible
1375 // via the Buffer for processing later. All the data
1376 // goes in a queue for each channel.
1377 _queues[rthead->channel].push(chunk);
1378 } else {
1379 // Use the existing Buffer for this packet, as it's a
1380 // continuation messages for an existing packet. Leave
1381 // the message in the queue, we just want access to
1382 // the Buffer.
1383 chunk = _queues[rthead->channel].peek();
1384 }
1385 #if 1
1386 // Red5 version 5 sends out PING messages with a 1 byte header. I think this
1387 // may be a bug in Red5, but we should handle it anyway.
1388 if (chunk == nullptr) {
1389 cerr << "Chunk wasn't allocated! " << (rthead->bodysize + rthead->head_size)
1390 << std::endl;
1391 chunk.reset(new cygnal::Buffer(rthead->bodysize + rthead->head_size));
1392 chunk->clear(); // FIXME: temporary debug only, should be unnecessary
1393 _queues[rthead->channel].push(chunk);
1394 }
1395 #endif
1396 // Many RTMP messages are smaller than the chunksize, so
1397 // they're easy. Each channel may have a different
1398 // chunksize, just to keep things interesting. The
1399 // chunksize for a channel is changed by the Chunksize
1400 // RTMP command.
1401 if (chunk->size() <= _chunksize[rthead->channel]) {
1402 // Since the total RTMP message size is less than the
1403 // chunksize for this channel, the packet size is the
1404 // total message size.
1405 pktsize = chunk->size();
1406 } else {
1407 // This RTMP message is larger than the chunksize for
1408 // this channel, so the packet size is smaller than
1409 // the total message size. The header bytes aren't
1410 // counted as part of the message size, so we read the
1411 // header plus all the data up to the channel chunksize.
1412 if (rthead->head_size > 1) {
1413 pktsize = rthead->head_size + _chunksize[rthead->channel];
1414 } else {
1415 // One byte headers are continuation messages for
1416 // existing data. There may be multiple
1417 // continuation messages to complete the RTMP
1418 // messagem, so all packets are read up to the
1419 // chunksize but the last packet of the sequence.
1420 if (chunk->spaceLeft() < _chunksize[rthead->channel]) {
1421 // don't store the continutation header,jusdt
1422 // append the data,
1423 pktsize = chunk->spaceLeft();
1424 } else {
1425 pktsize = rthead->head_size + (chunk->size() - _chunksize[rthead->channel]);
1426 }
1427 }
1428 }
1429
1430 // Now that we calculated the packet size, range check it
1431 // for sanity.
1432 if (pktsize <= (_chunksize[rthead->channel] + RTMP_MAX_HEADER_SIZE)) {
1433 // Skip the header for all but the first packet. The rest are just to
1434 // complete all the data up to the body size from the header.
1435 // cerr << _queues[rthead->channel].size() << " messages in queue for channel "
1436 // << rthead->channel << endl;
1437 if ((rthead->head_size == 1) && (ptr != data)) {
1438 // cerr << "FOLLOWING PACKET!" << " for channel " << rthead->channel << endl;
1439 // cerr << "Space Left in buffer for channel " << rthead->channel << " is: "
1440 // << chunk->spaceLeft() << endl;
1441 ptr += rthead->head_size;
1442 // } else {
1443 // cerr << "FIRST PACKET!" << " for channel " << rthead->channel << endl;
1444 }
1445 // This is a queue of channels with active messages. This is a
1446 // much smaller list to traverse when processing data than all 64 channels.
1447 if (pktsize < 0xffffff) {
1448 // cerr << "FIXME5: " << hexify(ptr, pktsize, true) << endl;
1449 // If the packet size is in range, then append the
1450 // data to the existing data to complete the message.
1451 chunk->append(ptr, pktsize);
1452 _lastsize[rthead->channel] = rthead->bodysize;
1453 // cerr << "Adding data to existing packet for channel #" << rthead->channel
1454 // << ", read " << pktsize << " bytes." << endl;
1455 // FIXME: why is this off by 1 byte ?
1456 ptr += pktsize - 1;
1457 } else {
1458 log_error(_("Packet size out of range! %d, %d"),
1459 rthead->bodysize, pktsize);
1460 }
1461 // add this packet to the list of active channels if we've gotten
1462 // all the data.
1463 // if (chunk->allocated() - 5 >= rthead->bodysize) {
1464 // cerr << "Bodysize: " << rthead->bodysize
1465 // << " Allocated Chunksize: " << chunk->allocated() << endl;
1466 channels->push_back(&_queues[rthead->channel]);
1467 // }
1468 } else {
1469 log_error(_("RTMP packet size is out of range! %d, %d"),
1470 rthead->bodysize, pktsize);
1471 break;
1472 }
1473 } else {
1474 log_error(_("RTMP header size is out of range! %d"),
1475 rthead->head_size);
1476 break;
1477 }
1478 }
1479
1480 return channels;
1481 }
1482
1483
1484 } // end of gnash namespace
1485
1486
1487 // local Variables:
1488 // mode: C++
1489 // indent-tabs-mode: nil
1490 // End:
1491