1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4.  If not, see <http://www.gnu.org/licenses/>.
16 
17 #include "channel.h"
18 #include "allocator.h"
19 #include "fatal_errors.h"
20 #include "incoming_frame.h"
21 #include "incoming_message_frame_list.h"
22 #include "outgoing_frame.h"
23 #include "serialization.h"
24 #include <cstring>
25 
26 // selected per platform
27 #include <mutex.h>
28 
29 using namespace yami;
30 using namespace details;
31 
common_init(allocator & alloc,mutex & mtx,const options & configuration_options,const core::parameters * overriding_options,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)32 void channel::common_init(allocator & alloc, mutex & mtx,
33     const options & configuration_options,
34     const core::parameters * overriding_options,
35     core::incoming_message_dispatch_function incoming_message_callback,
36     void * incoming_message_hint,
37     core::event_notification_function event_notification_callback,
38     void * event_notification_hint,
39     core::io_error_function io_error_callback,
40     void * io_error_callback_hint)
41 {
42     configuration_options_ = configuration_options;
43     configuration_options_.override_with_user_values(overriding_options);
44     alloc_ = &alloc;
45     mtx_ = &mtx;
46     target_ = NULL;
47     ref_count_ = 1;
48     mode_ = operational;
49     direction_ = input;
50     fd_ = empty_io_descriptor;
51     selector_index_ = -1;
52     frame_buffer_ = NULL;
53     buffer_capacity_ = 0; // this will be set lazily
54     buffer_available_ = 0;
55     buffer_consumed_ = 0;
56     target_address_ = NULL;
57     datagram_buffer_ = NULL;
58     output_frame_offset_ = 0;
59     first_outgoing_frame_ = NULL;
60     last_outgoing_frame_ = NULL;
61     incoming_state_ = read_frame_header;
62     read_offset_ = 0;
63     current_message_id_ = 0;
64     current_frame_number_ = 0;
65     current_message_header_size_ = 0;
66     current_frame_payload_size_ = 0;
67     current_frame_payload_ = NULL;
68     last_frame_ = false;
69     first_incoming_frame_list_ = NULL;
70     last_incoming_frame_list_ = NULL;
71     pending_outgoing_bytes_ = 0;
72     incoming_message_callback_ = incoming_message_callback;
73     incoming_message_hint_ = incoming_message_hint;
74     event_notification_callback_ = event_notification_callback;
75     event_notification_hint_ = event_notification_hint;
76     io_error_callback_ = io_error_callback;
77     io_error_callback_hint_ = io_error_callback_hint;
78 
79 #ifdef YAMI4_WITH_OPEN_SSL
80     ssl_ = NULL;
81 #endif // YAMI4_WITH_OPEN_SSL
82 }
83 
84 // to be synchronized by caller
clean()85 void channel::clean()
86 {
87     close_connection();
88 
89     if (target_ != NULL)
90     {
91         alloc_->deallocate(target_);
92     }
93 
94     if (protocol_ == proto_udp)
95     {
96         if (target_address_ != NULL)
97         {
98             alloc_->deallocate(target_address_);
99         }
100 
101         if (datagram_buffer_ != NULL)
102         {
103             alloc_->deallocate(datagram_buffer_);
104         }
105     }
106 
107     if (frame_buffer_ != NULL)
108     {
109         alloc_->deallocate(frame_buffer_);
110     }
111 
112     clean_outgoing_frames(first_outgoing_frame_);
113     clean_incoming_messages();
114 
115 #ifdef YAMI4_WITH_OPEN_SSL
116     if (ssl_ != NULL)
117     {
118         SSL_free(ssl_);
119     }
120 #endif // YAMI4_WITH_OPEN_SSL
121 }
122 
123 // should be called outside of critical section
init(allocator & alloc,mutex & mtx,const options & configuration_options,const core::parameters * overriding_options,const char * target,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)124 core::result channel::init(allocator & alloc, mutex & mtx,
125     const options & configuration_options,
126     const core::parameters * overriding_options,
127     const char * target,
128     core::incoming_message_dispatch_function incoming_message_callback,
129     void * incoming_message_hint,
130     core::event_notification_function event_notification_callback,
131     void * event_notification_hint,
132     core::io_error_function io_error_callback,
133     void * io_error_callback_hint)
134 {
135     common_init(alloc, mtx, configuration_options, overriding_options,
136         incoming_message_callback, incoming_message_hint,
137         event_notification_callback, event_notification_hint,
138         io_error_callback, io_error_callback_hint);
139 
140     const std::size_t target_length = std::strlen(target);
141 
142     target_ = static_cast<char *>(alloc_->allocate(target_length + 1));
143 
144     core::result res;
145     if (target_ != NULL)
146     {
147         std::strcpy(target_, target);
148 
149         res = connect();
150 
151         if (res != core::ok)
152         {
153             alloc_->deallocate(target_);
154             target_ = NULL;
155         }
156     }
157     else
158     {
159         res = core::no_memory;
160     }
161 
162     return res;
163 }
164 
init(allocator & alloc,mutex & mtx,const options & configuration_options,char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)165 core::result channel::init(allocator & alloc, mutex & mtx,
166     const options & configuration_options,
167     char * target,
168     io_descriptor_type fd, protocol prot, std::size_t preferred_frame_size,
169     core::incoming_message_dispatch_function incoming_message_callback,
170     void * incoming_message_hint,
171     core::event_notification_function event_notification_callback,
172     void * event_notification_hint,
173     core::io_error_function io_error_callback,
174     void * io_error_callback_hint)
175 {
176     common_init(alloc, mtx, configuration_options, NULL,
177         incoming_message_callback, incoming_message_hint,
178         event_notification_callback, event_notification_hint,
179         io_error_callback, io_error_callback_hint);
180 
181     // ownership transfer
182     target_ = target;
183 
184     alloc_ = &alloc;
185     fd_ = fd;
186     protocol_ = prot;
187     direction_ = inout;
188     preferred_frame_size_ = preferred_frame_size;
189 
190     core::result res = core::ok;
191 
192     if (prot == proto_tcp || prot == proto_unix || prot == proto_file)
193     {
194         // allocate a small read buffer for stream-oriented connections
195 
196         // Note: this function is called when an existing channel is added
197         // to the group, which typically happens only for channels
198         // that are created after being accepted by TCP listeners.
199         // The remaining stream protocols (unix/file) are handled here
200         // defensively and for completeness.
201 
202         switch (prot)
203         {
204         case proto_tcp:
205             buffer_capacity_ = configuration_options.tcp_frame_size;
206             break;
207         case proto_unix:
208             buffer_capacity_ = configuration_options.unix_frame_size;
209             break;
210         case proto_file:
211             buffer_capacity_ = configuration_options.file_frame_size;
212             break;
213         }
214 
215         frame_buffer_ = static_cast<char *>(
216             alloc_->allocate(buffer_capacity_));
217         if (frame_buffer_ == NULL)
218         {
219             res = core::no_memory;
220         }
221     }
222 
223     return res;
224 }
225 
226 #ifdef YAMI4_WITH_OPEN_SSL
set_client_ssl(SSL * ssl)227 void channel::set_client_ssl(SSL * ssl)
228 {
229     ssl_ = ssl;
230 
231     SSL_set_connect_state(ssl_);
232 }
233 
set_server_ssl(SSL * ssl)234 void channel::set_server_ssl(SSL * ssl)
235 {
236     ssl_ = ssl;
237 
238     SSL_set_accept_state(ssl_);
239 }
240 #endif // YAMI4_WITH_OPEN_SSL
241 
connect()242 core::result channel::connect()
243 {
244     core::result res;
245 
246     const char tcp_prefix[] = "tcp://";
247     const std::size_t tcp_prefix_size = sizeof(tcp_prefix) - 1;
248 
249     const char udp_prefix[] = "udp://";
250     const std::size_t udp_prefix_size = sizeof(udp_prefix) - 1;
251 
252     const char unix_prefix[] = "unix://";
253     const std::size_t unix_prefix_size = sizeof(unix_prefix) - 1;
254 
255     const char file_prefix[] = "file://";
256     const std::size_t file_prefix_size = sizeof(file_prefix) - 1;
257 
258 #ifdef YAMI4_WITH_QNX
259     const char qnx_prefix[] = "qnx://";
260     const std::size_t qnx_prefix_size = sizeof(qnx_prefix) - 1;
261 #endif // YAMI4_WITH_QNX
262 
263 #ifdef YAMI4_WITH_OPEN_SSL
264     const char tcps_prefix[] = "tcps://";
265     const std::size_t tcps_prefix_size = sizeof(tcps_prefix) - 1;
266 #endif // YAMI4_WITH_OPEN_SSL
267 
268     if (std::strncmp(tcp_prefix, target_, tcp_prefix_size) == 0)
269     {
270         const char * tcp_address = target_ + tcp_prefix_size;
271         res = connect_to_tcp(tcp_address);
272     }
273     else if (std::strncmp(udp_prefix, target_, udp_prefix_size) == 0)
274     {
275         const char * udp_address = target_ + udp_prefix_size;
276         res = connect_to_udp(udp_address);
277     }
278     else if (std::strncmp(unix_prefix, target_, unix_prefix_size) == 0)
279     {
280         const char * path = target_ + unix_prefix_size;
281         res = connect_to_unix(path);
282     }
283     else if (std::strncmp(file_prefix, target_, file_prefix_size) == 0)
284     {
285         const char * file_name = target_ + file_prefix_size;
286         res = connect_to_file(file_name);
287     }
288 #ifdef YAMI4_WITH_QNX
289     else if (std::strncmp(qnx_prefix, target_, qnx_prefix_size) == 0)
290     {
291         const char * qnx_name = target_ + qnx_prefix_size;
292         res = connect_to_qnx(qnx_name);
293     }
294 #endif // YAMI4_WITH_QNX
295 #ifdef YAMI4_WITH_OPEN_SSL
296     else if (std::strncmp(tcps_prefix, target_, tcps_prefix_size) == 0)
297     {
298         // note: connect_to_tcp() is used to create SSL connections,
299         // there are no special actions to be taken other than
300         // setting up different protocol value
301 
302         const char * tcp_address = target_ + tcps_prefix_size;
303         res = connect_to_tcp(tcp_address);
304 
305         if (res == core::ok)
306         {
307             protocol_ = proto_tcps;
308         }
309     }
310 #endif // YAMI4_WITH_OPEN_SSL
311     else
312     {
313         res = core::bad_protocol;
314     }
315 
316     return res;
317 }
318 
get_frame_size() const319 std::size_t channel::get_frame_size() const
320 {
321     return preferred_frame_size_;
322 }
323 
post(std::size_t priority,char ** buffers,const std::size_t * buffer_sizes,std::size_t number_of_frames,bool & first_frame,core::message_progress_function progress_callback,void * progress_hint)324 core::result channel::post(std::size_t priority,
325     char * * buffers, const std::size_t * buffer_sizes,
326     std::size_t number_of_frames,
327     bool & first_frame,
328     core::message_progress_function progress_callback,
329     void * progress_hint)
330 {
331     core::result res;
332 
333     if (mode_ != operational)
334     {
335         res = core::bad_state;
336     }
337     else
338     {
339         // this function inserts new set of nodes to the outgoing queue
340         // based on the frame descriptions given in parameters
341 
342         first_frame = first_outgoing_frame_ == NULL;
343 
344         std::size_t total_byte_count = 0;
345         for (std::size_t i = 0; i != number_of_frames; ++i)
346         {
347             total_byte_count += buffer_sizes[i];
348         }
349 
350         // prepare the sublist of new nodes
351 
352         res = core::ok;
353         std::size_t byte_count = 0;
354         outgoing_frame * first_new_frame = NULL;
355         outgoing_frame * last_new_frame = NULL;
356         for (std::size_t i = 0; i != number_of_frames; ++i)
357         {
358             outgoing_frame * frame = static_cast<outgoing_frame *>(
359                 alloc_->allocate(sizeof(outgoing_frame)));
360 
361             if (frame != NULL)
362             {
363                 if (first_new_frame == NULL)
364                 {
365                     first_new_frame = frame;
366                 }
367 
368                 byte_count += buffer_sizes[i];
369 
370                 frame->data = buffers[i];
371                 frame->size = buffer_sizes[i];
372                 frame->progress_callback = progress_callback;
373                 frame->progress_hint = progress_hint;
374                 frame->byte_count = byte_count;
375                 frame->total_byte_count = total_byte_count;
376                 frame->priority = priority;
377                 frame->close_flag = false;
378                 frame->next = NULL;
379 
380                 if (last_new_frame != NULL)
381                 {
382                     last_new_frame->next = frame;
383                 }
384 
385                 last_new_frame = frame;
386             }
387             else
388             {
389                 // rollback what was already allocated
390                 while (first_new_frame != NULL)
391                 {
392                     outgoing_frame * next = first_new_frame->next;
393                     alloc_->deallocate(first_new_frame);
394                     first_new_frame = next;
395                 }
396 
397                 res = core::no_memory;
398                 break;
399             }
400         }
401 
402         if (res == core::ok)
403         {
404             outgoing_frame * * insertion_point =
405                 find_outgoing_insertion_point(priority);
406 
407             // continuation of the list after newly added nodes
408             outgoing_frame * following_frame = *insertion_point;
409 
410             // insert new nodes
411 
412             last_new_frame->next = following_frame;
413             *insertion_point = first_new_frame;
414 
415             if (following_frame == NULL)
416             {
417                 // the nodes were added at the end of the list
418                 last_outgoing_frame_ = last_new_frame;
419             }
420 
421             pending_outgoing_bytes_ += total_byte_count;
422         }
423     }
424 
425     return res;
426 }
427 
do_some_work(io_direction direction,bool & close_me)428 core::result channel::do_some_work(io_direction direction, bool & close_me)
429 {
430     core::result res = core::ok;
431 
432     if (mode_ == operational && (direction == input || direction == inout))
433     {
434         res = do_some_input();
435 
436         if (res == core::io_error ||
437             res == core::channel_closed ||
438             res == core::unexpected_value)
439         {
440             mode_ = hard_close;
441             close_me = true;
442         }
443     }
444 
445     // output is allowed even in the soft closing state
446     // (pending messages need to be pushed out)
447     if (res == core::ok &&
448         (mode_ == operational || mode_ == soft_close) &&
449         (direction == output || direction == inout))
450     {
451         res = do_some_output(close_me);
452     }
453 
454     return res;
455 }
456 
post_close(std::size_t priority,bool & close_me)457 core::result channel::post_close(std::size_t priority, bool & close_me)
458 {
459     core::result res;
460 
461     if (mode_ != operational)
462     {
463         // second close on the same channel is not allowed
464         res = core::bad_state;
465     }
466     else
467     {
468         if (first_outgoing_frame_ == NULL)
469         {
470             // the list is empty, immediately close the connection
471 
472             mode_ = hard_close;
473             close_me = true;
474             res = core::ok;
475         }
476         else
477         {
478             outgoing_frame * poison_pill = static_cast<outgoing_frame *>(
479                 alloc_->allocate(sizeof(outgoing_frame)));
480 
481             if (poison_pill != NULL)
482             {
483                 poison_pill->data = NULL;
484                 poison_pill->size = 0;
485                 poison_pill->close_flag = true;
486                 poison_pill->next = NULL;
487 
488                 outgoing_frame * * insertion_point =
489                     find_outgoing_insertion_point(priority);
490 
491                 // continuation of the list after poison pill
492                 outgoing_frame * following_frame = *insertion_point;
493 
494                 // insert the poison pill
495                 // and clean the remaining part of the list
496 
497                 *insertion_point = poison_pill;
498                 last_outgoing_frame_ = poison_pill;
499 
500                 mode_ = soft_close;
501 
502                 // at this point the list is terminated
503                 // just after the poison pill,
504                 // which means that the following part of the list
505                 // is not visible to the outside observers
506                 // - it is safe to execute user callbacks in this state
507 
508                 clean_outgoing_frames(following_frame);
509 
510                 res = core::ok;
511             }
512             else
513             {
514                 res = core::no_memory;
515             }
516         }
517 
518         clean_incoming_messages();
519     }
520 
521     return res;
522 }
523 
move_target()524 const char * channel::move_target()
525 {
526     // destructive move to the caller,
527     // intended to call just before clean if the target information
528     // is still needed
529 
530     const char * res = target_;
531     target_ = NULL;
532     return res;
533 }
534 
get_io_descriptor(io_descriptor_type & fd,io_direction & direction) const535 void channel::get_io_descriptor(
536     io_descriptor_type & fd, io_direction & direction) const
537 {
538     fd = fd_;
539 
540     // if there are no outgoing frames, then the channel should not
541     // advertise itself as being interested in doing output operation
542     // it can still be interested in doing input
543 
544     switch (direction_)
545     {
546     case none:
547         // the channel should never be in this state
548         fatal_failure(__FILE__, __LINE__);
549         break;
550     case input:
551         direction = input;
552         break;
553     case output:
554         if (first_outgoing_frame_ != NULL)
555         {
556             direction = output;
557         }
558         else
559         {
560             direction = none;
561         }
562         break;
563     case inout:
564         if (first_outgoing_frame_ != NULL)
565         {
566             direction = inout;
567         }
568         else
569         {
570             direction = input;
571         }
572         break;
573     }
574 }
575 
find_outgoing_insertion_point(std::size_t priority)576 outgoing_frame * * channel::find_outgoing_insertion_point(
577     std::size_t priority)
578 {
579     // find the proper place for new node(s), according to priorities
580     // the first frame is left in place independent of priorities
581 
582     outgoing_frame * * insertion_point;
583     if (first_outgoing_frame_ == NULL)
584     {
585         // list is empty
586         insertion_point = &first_outgoing_frame_;
587     }
588     else
589     {
590         // the list contains at least one frame
591 
592         // check for the most common scenario,
593         // when the priority of the new node(s) is not higher
594         // than the priorities of existing nodes
595 
596         if (priority <= last_outgoing_frame_->priority)
597         {
598             // append at the end
599             insertion_point = &(last_outgoing_frame_->next);
600         }
601         else
602         {
603             // find the insertion point according to priorities
604             // but leave the first frame intact
605 
606             insertion_point = &(first_outgoing_frame_->next);
607 
608             while (*insertion_point != NULL &&
609                 (*insertion_point)->priority >= priority)
610             {
611                 insertion_point = &((*insertion_point)->next);
612             }
613         }
614     }
615 
616     return insertion_point;
617 }
618 
clean_outgoing_frames(outgoing_frame * frame)619 void channel::clean_outgoing_frames(outgoing_frame * frame)
620 {
621     // this function is also responsible for notifying all
622     // abandoned messages about their cancellation
623     // the notification (callback to the user code) is always safe,
624     // becuase this function is called in a state when the outgoing
625     // frames starting from frame are not visible to outside observers
626 
627     while (frame != NULL)
628     {
629         pending_outgoing_bytes_ -= frame->size;
630 
631         if (frame->close_flag == false &&
632             (frame->byte_count == frame->total_byte_count))
633         {
634             // this is the last outgoing frame for the given message
635 
636             core::message_progress_function progress_callback =
637                 frame->progress_callback;
638             void * progress_hint = frame->progress_hint;
639 
640             // notify the message that it was cancelled
641 
642             if (progress_callback != NULL)
643             {
644                 notify_cancellation(progress_callback, progress_hint);
645             }
646         }
647 
648         const char * data = frame->data;
649         if (data != NULL)
650         {
651             alloc_->deallocate(data);
652         }
653 
654         outgoing_frame * next = frame->next;
655         alloc_->deallocate(frame);
656         frame = next;
657     }
658 }
659 
notify_cancellation(core::message_progress_function progress_callback,void * progress_hint)660 void channel::notify_cancellation(
661     core::message_progress_function progress_callback,
662     void * progress_hint)
663 {
664     mtx_->unlock();
665 
666     try
667     {
668         // zero sizes in callback indicate cancellation
669         progress_callback(progress_hint, 0, 0);
670     }
671     catch (...)
672     {
673         // ignore exceptions from user callbacks
674     }
675 
676     // revert the locked state
677     mtx_->lock();
678 }
679 
parse_incoming_frame_header()680 void channel::parse_incoming_frame_header()
681 {
682     // note: the buffer has exactly 4 words,
683     // there is no possibility for error here
684 
685     const std::size_t num_of_buffers = 1;
686     const char * buffers[num_of_buffers];
687     std::size_t sizes[num_of_buffers];
688 
689     buffers[0] = frame_head_buffer_;
690     sizes[0] = frame_head_size;
691 
692     std::size_t current_buffer = 0;
693     const char * buffer_position = buffers[0];
694 
695     int value;
696     core::result res = get_integer(buffers, sizes, num_of_buffers,
697         current_buffer, buffer_position, value);
698     if (res != core::ok)
699     {
700         fatal_failure(__FILE__, __LINE__);
701     }
702 
703     current_message_id_ = value;
704 
705     res = get_integer(buffers, sizes, num_of_buffers,
706         current_buffer, buffer_position, value);
707     if (res != core::ok)
708     {
709         fatal_failure(__FILE__, __LINE__);
710     }
711 
712     if (value >= 0)
713     {
714         last_frame_ = false;
715         current_frame_number_ = value;
716     }
717     else
718     {
719         last_frame_ = true;
720         current_frame_number_ = -value;
721     }
722 
723     res = get_integer(buffers, sizes, num_of_buffers,
724         current_buffer, buffer_position, value);
725     if (res != core::ok)
726     {
727         fatal_failure(__FILE__, __LINE__);
728     }
729 
730     current_message_header_size_ = static_cast<std::size_t>(value);
731 
732     res = get_integer(buffers, sizes, num_of_buffers,
733         current_buffer, buffer_position, value);
734     if (res != core::ok)
735     {
736         fatal_failure(__FILE__, __LINE__);
737     }
738 
739     current_frame_payload_size_ = static_cast<std::size_t>(value);
740 }
741 
store_incoming_frame()742 core::result channel::store_incoming_frame()
743 {
744     core::result res;
745 
746     incoming_message_frame_list * list;
747 
748     incoming_frame * new_frame = static_cast<incoming_frame *>(
749         alloc_->allocate(sizeof(incoming_frame)));
750     if (new_frame != NULL)
751     {
752         new_frame->frame_number = current_frame_number_;
753         new_frame->data = current_frame_payload_;
754         new_frame->size = current_frame_payload_size_;
755 
756         current_frame_payload_ = NULL;
757 
758         if (first_incoming_frame_list_ == NULL)
759         {
760             // no incoming frame list yet,
761             // create the new list with single frame
762 
763             list = static_cast<incoming_message_frame_list *>(
764                 alloc_->allocate(sizeof(incoming_message_frame_list)));
765             if (list != NULL)
766             {
767                 list->message_id = current_message_id_;
768                 list->header_size = current_message_header_size_;
769                 list->check_completeness = last_frame_;
770                 list->next_list = NULL;
771 
772                 new_frame->next = NULL;
773                 list->first_frame = new_frame;
774                 list->last_frame = new_frame;
775 
776                 first_incoming_frame_list_ = list;
777                 last_incoming_frame_list_ = list;
778 
779                 res = core::ok;
780             }
781             else
782             {
783                 alloc_->deallocate(new_frame->data);
784                 alloc_->deallocate(new_frame);
785 
786                 res = core::no_memory;
787             }
788         }
789         else
790         {
791             // some lists already exist - find the list for the same message
792 
793             list = first_incoming_frame_list_;
794             while (list != NULL && list->message_id != current_message_id_)
795             {
796                 list = list->next_list;
797             }
798 
799             if (list != NULL)
800             {
801                 // found appropriate list,
802                 // insert the new (current) frame in the
803                 // correct location, according to its frame number
804                 // note: each list has at least one frame already in it,
805                 // there are no empty lists
806 
807                 // the optimistic and most frequent case is
808                 // when the current frame
809                 // should be appended to the end of the list
810 
811                 if (current_frame_number_ >= list->last_frame->frame_number)
812                 {
813                     new_frame->next = NULL;
814                     list->last_frame->next = new_frame;
815                     list->last_frame = new_frame;
816 
817                     if (last_frame_)
818                     {
819                         list->check_completeness = true;
820                     }
821                 }
822                 else
823                 {
824                     // insert new frame in between existing frames
825 
826                     incoming_frame * * insertion_point = &(list->first_frame);
827 
828                     while ((*insertion_point)->frame_number <=
829                         current_frame_number_)
830                     {
831                         insertion_point = &((*insertion_point)->next);
832                     }
833 
834                     new_frame->next = *insertion_point;
835                     *insertion_point = new_frame;
836 
837                     // use the message header size from first frame
838                     // (values from other frames are ignored)
839                     if (current_frame_number_ == 1)
840                     {
841                         list->header_size = current_message_header_size_;
842                     }
843                 }
844 
845                 res = core::ok;
846             }
847             else
848             {
849                 // no appropriate list found - this is the first frame
850                 // of the message that was not yet seen
851 
852                 // create new list with this single frame
853                 // and append it after all existing lists
854 
855                 list = static_cast<incoming_message_frame_list *>(
856                     alloc_->allocate(sizeof(incoming_message_frame_list)));
857                 if (list != NULL)
858                 {
859                     list->message_id = current_message_id_;
860                     list->header_size = current_message_header_size_;
861                     list->check_completeness = last_frame_;
862                     list->next_list = NULL;
863 
864                     new_frame->next = NULL;
865                     list->first_frame = new_frame;
866                     list->last_frame = new_frame;
867 
868                     last_incoming_frame_list_->next_list = list;
869                     last_incoming_frame_list_ = list;
870 
871                     res = core::ok;
872                 }
873                 else
874                 {
875                     alloc_->deallocate(new_frame->data);
876                     alloc_->deallocate(new_frame);
877 
878                     res = core::no_memory;
879                 }
880             }
881         }
882     }
883     else
884     {
885         alloc_->deallocate(current_frame_payload_);
886         current_frame_payload_ = NULL;
887 
888         res = core::no_memory;
889     }
890 
891     if (res == core::ok && list->check_completeness)
892     {
893         // this message already contains its closing frame
894         // -> check its completeness and dispatch
895 
896         res = dispatch_incoming_message(list);
897     }
898 
899     return res;
900 }
901 
dispatch_short_incoming_message()902 core::result channel::dispatch_short_incoming_message()
903 {
904     // the whole message (header and body) is contained in the single frame
905 
906     core::result res;
907 
908     if ((current_message_header_size_ >= current_frame_payload_size_) ||
909         (current_message_header_size_ % word_size != 0) ||
910         (current_frame_payload_size_ % word_size != 0))
911     {
912         alloc_->deallocate(current_frame_payload_);
913         current_frame_payload_ = NULL;
914 
915         res = core::unexpected_value;
916     }
917     else
918     {
919         if (event_notification_callback_ != NULL)
920         {
921             try
922             {
923                 event_notification_callback_(
924                     event_notification_hint_,
925                     core::message_received,
926                     target_,
927                     current_frame_payload_size_ + frame_head_size);
928             }
929             catch (...)
930             {
931                 // ignore errors from user callback
932             }
933         }
934 
935         if (incoming_message_callback_ != NULL)
936         {
937             const char * header_buffers[1];
938             header_buffers[0] = current_frame_payload_;
939 
940             std::size_t header_buffer_sizes[1];
941             header_buffer_sizes[0] = current_message_header_size_;
942 
943             std::size_t num_of_header_buffers = 1;
944 
945             const char * body_buffers[1];
946             body_buffers[0] =
947                 current_frame_payload_ + current_message_header_size_;
948 
949             std::size_t body_buffer_sizes[1];
950             body_buffer_sizes[0] =
951                 current_frame_payload_size_ - current_message_header_size_;
952 
953             std::size_t num_of_body_buffers = 1;
954 
955             mtx_->unlock();
956 
957             try
958             {
959                 incoming_message_callback_(
960                     incoming_message_hint_,
961                     target_,
962                     header_buffers,
963                     header_buffer_sizes,
964                     num_of_header_buffers,
965                     body_buffers,
966                     body_buffer_sizes,
967                     num_of_body_buffers);
968             }
969             catch (...)
970             {
971                 // ignore exceptions from user callbacks
972             }
973 
974             mtx_->lock();
975         }
976 
977         // the incoming buffer is already used at this point
978 
979         alloc_->deallocate(current_frame_payload_);
980         current_frame_payload_ = NULL;
981 
982         res = core::ok;
983     }
984 
985     return res;
986 }
987 
dispatch_incoming_message(incoming_message_frame_list * list)988 core::result channel::dispatch_incoming_message(
989     incoming_message_frame_list * list)
990 {
991     core::result res;
992 
993     // check sizes, completeness of the message and count frames
994 
995     std::size_t previous_frame_number = 0;
996 
997     std::size_t num_of_header_buffers = 0;
998     std::size_t num_of_body_buffers = 0;
999     incoming_frame * last_header_frame = NULL; // dummy initialization
1000     std::size_t header_part_of_last_header_frame = 0; // dummy initialization
1001     incoming_frame * first_body_frame = NULL;
1002     std::size_t body_offset_of_first_body_frame = 0; // dummy initialization
1003 
1004     std::size_t header_size_so_far = 0;
1005     enum { header, body } state = header;
1006 
1007     bool message_complete = true;
1008 
1009     incoming_frame * frame = list->first_frame;
1010     while (frame != NULL)
1011     {
1012         if (frame->frame_number != previous_frame_number + 1)
1013         {
1014             // message is not yet complete (missing frames)
1015             // continue without error, wait for next opportunity
1016 
1017             message_complete = false;
1018             frame = NULL;
1019         }
1020         else
1021         {
1022             if (state == header)
1023             {
1024                 ++num_of_header_buffers;
1025 
1026                 if (header_size_so_far + frame->size >= list->header_size)
1027                 {
1028                     // this frame completes the message header
1029 
1030                     last_header_frame = frame;
1031                     header_part_of_last_header_frame =
1032                         list->header_size - header_size_so_far;
1033 
1034                     if (header_size_so_far + frame->size > list->header_size)
1035                     {
1036                         // this frame also contains some initial data
1037                         // for the body
1038 
1039                         first_body_frame = frame;
1040                         body_offset_of_first_body_frame =
1041                             list->header_size - header_size_so_far;
1042 
1043                         ++num_of_body_buffers;
1044                     }
1045 
1046                     state = body;
1047                 }
1048                 else
1049                 {
1050                     header_size_so_far += frame->size;
1051                 }
1052             }
1053             else // state == body
1054             {
1055                 if (first_body_frame == NULL)
1056                 {
1057                     first_body_frame = frame;
1058                     body_offset_of_first_body_frame = 0;
1059                 }
1060 
1061                 ++num_of_body_buffers;
1062             }
1063 
1064             previous_frame_number = frame->frame_number;
1065             frame = frame->next;
1066         }
1067     }
1068 
1069     if (message_complete)
1070     {
1071         const char * * header_buffers = static_cast<const char * *>(
1072             alloc_->allocate(num_of_header_buffers * sizeof(const char *)));
1073         std::size_t * header_buffer_sizes = static_cast<std::size_t *>(
1074             alloc_->allocate(num_of_header_buffers * sizeof(std::size_t)));
1075         const char * * body_buffers = static_cast<const char * *>(
1076             alloc_->allocate(num_of_body_buffers * sizeof(const char *)));
1077         std::size_t * body_buffer_sizes = static_cast<std::size_t *>(
1078             alloc_->allocate(num_of_body_buffers * sizeof(std::size_t)));
1079 
1080         if (header_buffers != NULL &&
1081             header_buffer_sizes != NULL &&
1082             body_buffers != NULL &&
1083             body_buffer_sizes != NULL)
1084         {
1085             std::size_t total_bytes = 0;
1086             std::size_t i = 0;
1087             frame = list->first_frame;
1088             state = header;
1089             while (frame != NULL)
1090             {
1091                 if (state == header)
1092                 {
1093                     header_buffers[i] = frame->data;
1094                     if (frame != last_header_frame)
1095                     {
1096                         header_buffer_sizes[i] = frame->size;
1097 
1098                         ++i;
1099                     }
1100                     else
1101                     {
1102                         header_buffer_sizes[i] =
1103                             header_part_of_last_header_frame;
1104 
1105                         state = body;
1106                         i = 0;
1107 
1108                         if (frame == first_body_frame)
1109                         {
1110                             body_buffers[i] =
1111                                 frame->data + body_offset_of_first_body_frame;
1112                             body_buffer_sizes[i] =
1113                                 frame->size - body_offset_of_first_body_frame;
1114 
1115                             ++i;
1116                         }
1117                     }
1118                 }
1119                 else // state == body
1120                 {
1121                     body_buffers[i] = frame->data;
1122                     body_buffer_sizes[i] = frame->size;
1123 
1124                     ++i;
1125                 }
1126 
1127                 total_bytes += frame->size + frame_head_size;
1128 
1129                 frame = frame->next;
1130             }
1131 
1132             if (event_notification_callback_ != NULL)
1133             {
1134                 try
1135                 {
1136                     event_notification_callback_(
1137                         event_notification_hint_,
1138                         core::message_received,
1139                         target_, total_bytes);
1140                 }
1141                 catch (...)
1142                 {
1143                     // ignore errors from user callback
1144                 }
1145             }
1146 
1147             if (incoming_message_callback_ != NULL)
1148             {
1149                 mtx_->unlock();
1150 
1151                 try
1152                 {
1153                     incoming_message_callback_(
1154                         incoming_message_hint_,
1155                         target_,
1156                         header_buffers,
1157                         header_buffer_sizes,
1158                         num_of_header_buffers,
1159                         body_buffers,
1160                         body_buffer_sizes,
1161                         num_of_body_buffers);
1162                 }
1163                 catch (...)
1164                 {
1165                     // ignore exceptions from user callbacks
1166                 }
1167 
1168                 mtx_->lock();
1169             }
1170 
1171             // the incoming frames are no longer needed
1172 
1173             // detach this list from the rest
1174 
1175             incoming_message_frame_list * previous_list = NULL;
1176             incoming_message_frame_list * * removal_point =
1177                 &first_incoming_frame_list_;
1178             while (*removal_point != list)
1179             {
1180                 previous_list = *removal_point;
1181                 removal_point = &((*removal_point)->next_list);
1182             }
1183 
1184             if (*removal_point == last_incoming_frame_list_)
1185             {
1186                 // it is the last list of frames which is removed,
1187                 // update the pointer to the rightmost list
1188 
1189                 last_incoming_frame_list_ = previous_list;
1190             }
1191 
1192             *removal_point = (*removal_point)->next_list;
1193 
1194             clean_incoming_frames(list);
1195             alloc_->deallocate(list);
1196 
1197             res = core::ok;
1198         }
1199         else
1200         {
1201             res = core::no_memory;
1202         }
1203 
1204         if (header_buffers != NULL)
1205         {
1206             alloc_->deallocate(header_buffers);
1207         }
1208         if (header_buffer_sizes != NULL)
1209         {
1210             alloc_->deallocate(header_buffer_sizes);
1211         }
1212         if (body_buffers != NULL)
1213         {
1214             alloc_->deallocate(body_buffers);
1215         }
1216         if (body_buffer_sizes != NULL)
1217         {
1218             alloc_->deallocate(body_buffer_sizes);
1219         }
1220     }
1221     else
1222     {
1223         // do not report errors on incomplete messages
1224         // wait for the next opportunity to test for completeness
1225 
1226         res = core::ok;
1227     }
1228 
1229     return res;
1230 }
1231 
clean_incoming_messages()1232 void channel::clean_incoming_messages()
1233 {
1234     if (current_frame_payload_ != NULL)
1235     {
1236         alloc_->deallocate(current_frame_payload_);
1237         current_frame_payload_ = NULL;
1238     }
1239 
1240     // clean all incoming lists
1241     while (first_incoming_frame_list_ != NULL)
1242     {
1243         clean_incoming_frames(first_incoming_frame_list_);
1244 
1245         incoming_message_frame_list * next_list =
1246             first_incoming_frame_list_->next_list;
1247         alloc_->deallocate(first_incoming_frame_list_);
1248         first_incoming_frame_list_ = next_list;
1249     }
1250 
1251     last_incoming_frame_list_ = NULL;
1252 }
1253 
clean_incoming_frames(incoming_message_frame_list * list)1254 void channel::clean_incoming_frames(incoming_message_frame_list * list)
1255 {
1256     incoming_frame * frame = list->first_frame;
1257     while (frame != NULL)
1258     {
1259         alloc_->deallocate(frame->data);
1260 
1261         incoming_frame * next = frame->next;
1262         alloc_->deallocate(frame);
1263         frame = next;
1264     }
1265 }
1266 
get_first_outgoing_frame() const1267 outgoing_frame * channel::get_first_outgoing_frame() const
1268 {
1269     return first_outgoing_frame_;
1270 }
1271 
get_last_outgoing_frame() const1272 outgoing_frame * channel::get_last_outgoing_frame() const
1273 {
1274     return last_outgoing_frame_;
1275 }
1276 
get_first_incoming_frame_list() const1277 incoming_message_frame_list * channel::get_first_incoming_frame_list() const
1278 {
1279     return first_incoming_frame_list_;
1280 }
1281 
get_last_incoming_frame_list() const1282 incoming_message_frame_list * channel::get_last_incoming_frame_list() const
1283 {
1284     return last_incoming_frame_list_;
1285 }
1286 
get_incoming_state() const1287 incoming_fsm channel::get_incoming_state() const
1288 {
1289     return incoming_state_;
1290 }
1291 
get_pending_outgoing_bytes() const1292 std::size_t channel::get_pending_outgoing_bytes() const
1293 {
1294     return pending_outgoing_bytes_;
1295 }
1296 
process_complete_incoming_frame(const char * buffer,const std::size_t buffer_size)1297 core::result channel::process_complete_incoming_frame(
1298     const char * buffer, const std::size_t buffer_size)
1299 {
1300     core::result res;
1301 
1302     // parse the header
1303 
1304     if (buffer_size >= frame_head_size)
1305     {
1306         std::memcpy(frame_head_buffer_, buffer, frame_head_size);
1307         parse_incoming_frame_header();
1308 
1309         if (current_frame_payload_size_ == buffer_size - frame_head_size)
1310         {
1311             // behave as if the whole payload has been already read
1312 
1313             current_frame_payload_ = static_cast<char *>(
1314                 alloc_->allocate(current_frame_payload_size_));
1315             if (current_frame_payload_ != NULL)
1316             {
1317                 std::memcpy(current_frame_payload_,
1318                     buffer + frame_head_size, current_frame_payload_size_);
1319 
1320                 // short messages fit entirely in a single frame,
1321                 // so can be dispatched without creating
1322                 // any additional data structures
1323 
1324                 if (current_frame_number_ == 1 && last_frame_)
1325                 {
1326                     res = dispatch_short_incoming_message();
1327                 }
1328                 else
1329                 {
1330                     res = store_incoming_frame();
1331                 }
1332 
1333                 current_message_header_size_ = 0;
1334             }
1335             else
1336             {
1337                 res = core::no_memory;
1338             }
1339         }
1340         else
1341         {
1342             // corrupted buffer
1343 
1344             res = core::unexpected_value;
1345         }
1346     }
1347     else
1348     {
1349         // corrupted buffer
1350 
1351         res = core::unexpected_value;
1352     }
1353 
1354     return res;
1355 }
1356 
do_some_input()1357 core::result channel::do_some_input()
1358 {
1359     core::result res;
1360     std::size_t readn;
1361 
1362     if (protocol_ == proto_udp
1363 #ifdef YAMI4_WITH_QNX
1364         // added for completeness,
1365         // it is an error to attempt to read from QNX channel
1366         // (the error is reported by read_bytes)
1367         || protocol_ == proto_qnx
1368 #endif // YAMI4_WITH_QNX
1369     )
1370     {
1371         // This is a datagram protocol,
1372         // read the whole frame in a single operation.
1373 
1374         res = read_bytes(datagram_buffer_, preferred_frame_size_, readn);
1375         if (res == core::ok)
1376         {
1377             res = process_complete_incoming_frame(datagram_buffer_, readn);
1378         }
1379     }
1380     else
1381     {
1382         // This is a stream protocol,
1383         // read the frame header or frame payload according
1384         // to the state machine.
1385 
1386         if (incoming_state_ == read_frame_header)
1387         {
1388             res = read_bytes(frame_head_buffer_ + read_offset_,
1389                 frame_head_size - read_offset_, readn);
1390             if (res == core::ok)
1391             {
1392                 read_offset_ += readn;
1393 
1394                 if (read_offset_ == frame_head_size)
1395                 {
1396                     // the whole frame header has been read
1397 
1398                     parse_incoming_frame_header();
1399 
1400                     if (current_frame_payload_size_ != 0 &&
1401                         current_frame_payload_size_ % 4 == 0)
1402                     {
1403                         current_frame_payload_ = static_cast<char *>(
1404                             alloc_->allocate(current_frame_payload_size_));
1405                         if (current_frame_payload_ != NULL)
1406                         {
1407                             read_offset_ = 0;
1408                             incoming_state_ = read_frame_payload;
1409                         }
1410                         else
1411                         {
1412                             res = core::no_memory;
1413                         }
1414                     }
1415                     else
1416                     {
1417                         res = core::unexpected_value;
1418                     }
1419                 }
1420             }
1421         }
1422         else // incoming_state_ == read_frame_payload
1423         {
1424             res = read_bytes(current_frame_payload_ + read_offset_,
1425                 current_frame_payload_size_ - read_offset_, readn);
1426             if (res == core::ok)
1427             {
1428                 read_offset_ += readn;
1429 
1430                 if (read_offset_ == current_frame_payload_size_)
1431                 {
1432                     // the whole payload has been read
1433 
1434                     // short messages fit entirely in a single frame,
1435                     // so can be dispatched without creating
1436                     // any additional data structures
1437 
1438                     if (current_frame_number_ == 1 && last_frame_)
1439                     {
1440                         res = dispatch_short_incoming_message();
1441                     }
1442                     else
1443                     {
1444                         res = store_incoming_frame();
1445                     }
1446 
1447                     current_message_header_size_ = 0;
1448                     read_offset_ = 0;
1449                     incoming_state_ = read_frame_header;
1450                 }
1451             }
1452         }
1453 
1454         if (res == core::io_error)
1455         {
1456             // general I/O error
1457             // abandon all incoming frames, because they all
1458             // belong to messages that are still being read
1459             // and there is no hope of resynchronizing the stream
1460             // to get them right again
1461 
1462             clean_incoming_messages();
1463         }
1464     }
1465 
1466     return res;
1467 }
1468 
1469 // common helper for stream and datagram channels,
1470 // called when the complete frame was pushed out
notify_progress_and_consume_frame_list()1471 void channel::notify_progress_and_consume_frame_list()
1472 {
1473     pending_outgoing_bytes_ -= first_outgoing_frame_->size;
1474 
1475     const std::size_t sent_bytes =
1476         first_outgoing_frame_->byte_count;
1477     const std::size_t total_message_size =
1478         first_outgoing_frame_->total_byte_count;
1479 
1480     if (event_notification_callback_ != NULL)
1481     {
1482         if (sent_bytes == total_message_size)
1483         {
1484             try
1485             {
1486                 event_notification_callback_(
1487                     event_notification_hint_,
1488                     core::message_sent,
1489                     target_, total_message_size);
1490             }
1491             catch (...)
1492             {
1493                 // ignore errors from user callback
1494             }
1495         }
1496     }
1497 
1498     core::message_progress_function progress_callback =
1499         first_outgoing_frame_->progress_callback;
1500     void * progress_hint =
1501         first_outgoing_frame_->progress_hint;
1502 
1503     if (progress_callback != NULL)
1504     {
1505         mtx_->unlock();
1506 
1507         try
1508         {
1509             progress_callback(progress_hint,
1510                 sent_bytes, total_message_size);
1511         }
1512         catch (...)
1513         {
1514             // ignore exceptions from user callbacks
1515         }
1516 
1517         mtx_->lock();
1518     }
1519 
1520     outgoing_frame * next = first_outgoing_frame_->next;
1521 
1522     alloc_->deallocate(first_outgoing_frame_->data);
1523     alloc_->deallocate(first_outgoing_frame_);
1524     first_outgoing_frame_ = next;
1525     output_frame_offset_ = 0;
1526     if (next == NULL)
1527     {
1528         // the list was exhausted
1529         last_outgoing_frame_ = NULL;
1530     }
1531 }
1532 
do_some_output(bool & close_me)1533 core::result channel::do_some_output(bool & close_me)
1534 {
1535     if (first_outgoing_frame_ == NULL)
1536     {
1537         fatal_failure(__FILE__, __LINE__);
1538     }
1539 
1540     core::result res;
1541 
1542     // close the connection if the poison pill happens to be
1543     // at the beginning of the queue
1544 
1545     if (first_outgoing_frame_->close_flag)
1546     {
1547         // there are no more frames in the queue
1548         if (first_outgoing_frame_->next != NULL)
1549         {
1550             fatal_failure(__FILE__, __LINE__);
1551         }
1552 
1553         mode_ = hard_close;
1554         close_me = true;
1555         res = core::ok;
1556     }
1557     else
1558     {
1559         if (protocol_ == proto_udp
1560 #ifdef YAMI4_WITH_QNX
1561             || protocol_ == proto_qnx
1562 #endif // YAMI4_WITH_QNX
1563         )
1564         {
1565             // This is a datagram (UDP or QNX) channel,
1566             // write the whole frame in a single operation.
1567 
1568             std::size_t written;
1569             res = write_bytes(
1570                 first_outgoing_frame_->data + output_frame_offset_,
1571                 first_outgoing_frame_->size - output_frame_offset_,
1572                 written);
1573             if (res == core::ok)
1574             {
1575                 // the whole frame has been pushed
1576 
1577                 notify_progress_and_consume_frame_list();
1578             }
1579         }
1580         else
1581         {
1582             // This is a stream channel.
1583 
1584             std::size_t written;
1585             res = write_bytes(
1586                 first_outgoing_frame_->data + output_frame_offset_,
1587                 first_outgoing_frame_->size - output_frame_offset_,
1588                 written);
1589             if (res == core::ok)
1590             {
1591                 output_frame_offset_ += written;
1592 
1593                 if (output_frame_offset_ == first_outgoing_frame_->size)
1594                 {
1595                     // the whole frame has been already pushed
1596 
1597                     notify_progress_and_consume_frame_list();
1598                 }
1599             }
1600         }
1601 
1602         if (res != core::ok)
1603         {
1604             // general I/O error
1605             // request close
1606             // (all frames in the output queue will be cleaned up
1607             // and progress will be notified as part of the channel closing)
1608 
1609             mode_ = hard_close;
1610             close_me = true;
1611         }
1612     }
1613 
1614     return res;
1615 }
1616