1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16
17 #include "channel.h"
18 #include "allocator.h"
19 #include "fatal_errors.h"
20 #include "incoming_frame.h"
21 #include "incoming_message_frame_list.h"
22 #include "outgoing_frame.h"
23 #include "serialization.h"
24 #include <cstring>
25
26 // selected per platform
27 #include <mutex.h>
28
29 using namespace yami;
30 using namespace details;
31
common_init(allocator & alloc,mutex & mtx,const options & configuration_options,const core::parameters * overriding_options,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)32 void channel::common_init(allocator & alloc, mutex & mtx,
33 const options & configuration_options,
34 const core::parameters * overriding_options,
35 core::incoming_message_dispatch_function incoming_message_callback,
36 void * incoming_message_hint,
37 core::event_notification_function event_notification_callback,
38 void * event_notification_hint,
39 core::io_error_function io_error_callback,
40 void * io_error_callback_hint)
41 {
42 configuration_options_ = configuration_options;
43 configuration_options_.override_with_user_values(overriding_options);
44 alloc_ = &alloc;
45 mtx_ = &mtx;
46 target_ = NULL;
47 ref_count_ = 1;
48 mode_ = operational;
49 direction_ = input;
50 fd_ = empty_io_descriptor;
51 selector_index_ = -1;
52 frame_buffer_ = NULL;
53 buffer_capacity_ = 0; // this will be set lazily
54 buffer_available_ = 0;
55 buffer_consumed_ = 0;
56 target_address_ = NULL;
57 datagram_buffer_ = NULL;
58 output_frame_offset_ = 0;
59 first_outgoing_frame_ = NULL;
60 last_outgoing_frame_ = NULL;
61 incoming_state_ = read_frame_header;
62 read_offset_ = 0;
63 current_message_id_ = 0;
64 current_frame_number_ = 0;
65 current_message_header_size_ = 0;
66 current_frame_payload_size_ = 0;
67 current_frame_payload_ = NULL;
68 last_frame_ = false;
69 first_incoming_frame_list_ = NULL;
70 last_incoming_frame_list_ = NULL;
71 pending_outgoing_bytes_ = 0;
72 incoming_message_callback_ = incoming_message_callback;
73 incoming_message_hint_ = incoming_message_hint;
74 event_notification_callback_ = event_notification_callback;
75 event_notification_hint_ = event_notification_hint;
76 io_error_callback_ = io_error_callback;
77 io_error_callback_hint_ = io_error_callback_hint;
78
79 #ifdef YAMI4_WITH_OPEN_SSL
80 ssl_ = NULL;
81 #endif // YAMI4_WITH_OPEN_SSL
82 }
83
84 // to be synchronized by caller
clean()85 void channel::clean()
86 {
87 close_connection();
88
89 if (target_ != NULL)
90 {
91 alloc_->deallocate(target_);
92 }
93
94 if (protocol_ == proto_udp)
95 {
96 if (target_address_ != NULL)
97 {
98 alloc_->deallocate(target_address_);
99 }
100
101 if (datagram_buffer_ != NULL)
102 {
103 alloc_->deallocate(datagram_buffer_);
104 }
105 }
106
107 if (frame_buffer_ != NULL)
108 {
109 alloc_->deallocate(frame_buffer_);
110 }
111
112 clean_outgoing_frames(first_outgoing_frame_);
113 clean_incoming_messages();
114
115 #ifdef YAMI4_WITH_OPEN_SSL
116 if (ssl_ != NULL)
117 {
118 SSL_free(ssl_);
119 }
120 #endif // YAMI4_WITH_OPEN_SSL
121 }
122
123 // should be called outside of critical section
init(allocator & alloc,mutex & mtx,const options & configuration_options,const core::parameters * overriding_options,const char * target,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)124 core::result channel::init(allocator & alloc, mutex & mtx,
125 const options & configuration_options,
126 const core::parameters * overriding_options,
127 const char * target,
128 core::incoming_message_dispatch_function incoming_message_callback,
129 void * incoming_message_hint,
130 core::event_notification_function event_notification_callback,
131 void * event_notification_hint,
132 core::io_error_function io_error_callback,
133 void * io_error_callback_hint)
134 {
135 common_init(alloc, mtx, configuration_options, overriding_options,
136 incoming_message_callback, incoming_message_hint,
137 event_notification_callback, event_notification_hint,
138 io_error_callback, io_error_callback_hint);
139
140 const std::size_t target_length = std::strlen(target);
141
142 target_ = static_cast<char *>(alloc_->allocate(target_length + 1));
143
144 core::result res;
145 if (target_ != NULL)
146 {
147 std::strcpy(target_, target);
148
149 res = connect();
150
151 if (res != core::ok)
152 {
153 alloc_->deallocate(target_);
154 target_ = NULL;
155 }
156 }
157 else
158 {
159 res = core::no_memory;
160 }
161
162 return res;
163 }
164
init(allocator & alloc,mutex & mtx,const options & configuration_options,char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::incoming_message_dispatch_function incoming_message_callback,void * incoming_message_hint,core::event_notification_function event_notification_callback,void * event_notification_hint,core::io_error_function io_error_callback,void * io_error_callback_hint)165 core::result channel::init(allocator & alloc, mutex & mtx,
166 const options & configuration_options,
167 char * target,
168 io_descriptor_type fd, protocol prot, std::size_t preferred_frame_size,
169 core::incoming_message_dispatch_function incoming_message_callback,
170 void * incoming_message_hint,
171 core::event_notification_function event_notification_callback,
172 void * event_notification_hint,
173 core::io_error_function io_error_callback,
174 void * io_error_callback_hint)
175 {
176 common_init(alloc, mtx, configuration_options, NULL,
177 incoming_message_callback, incoming_message_hint,
178 event_notification_callback, event_notification_hint,
179 io_error_callback, io_error_callback_hint);
180
181 // ownership transfer
182 target_ = target;
183
184 alloc_ = &alloc;
185 fd_ = fd;
186 protocol_ = prot;
187 direction_ = inout;
188 preferred_frame_size_ = preferred_frame_size;
189
190 core::result res = core::ok;
191
192 if (prot == proto_tcp || prot == proto_unix || prot == proto_file)
193 {
194 // allocate a small read buffer for stream-oriented connections
195
196 // Note: this function is called when an existing channel is added
197 // to the group, which typically happens only for channels
198 // that are created after being accepted by TCP listeners.
199 // The remaining stream protocols (unix/file) are handled here
200 // defensively and for completeness.
201
202 switch (prot)
203 {
204 case proto_tcp:
205 buffer_capacity_ = configuration_options.tcp_frame_size;
206 break;
207 case proto_unix:
208 buffer_capacity_ = configuration_options.unix_frame_size;
209 break;
210 case proto_file:
211 buffer_capacity_ = configuration_options.file_frame_size;
212 break;
213 }
214
215 frame_buffer_ = static_cast<char *>(
216 alloc_->allocate(buffer_capacity_));
217 if (frame_buffer_ == NULL)
218 {
219 res = core::no_memory;
220 }
221 }
222
223 return res;
224 }
225
226 #ifdef YAMI4_WITH_OPEN_SSL
set_client_ssl(SSL * ssl)227 void channel::set_client_ssl(SSL * ssl)
228 {
229 ssl_ = ssl;
230
231 SSL_set_connect_state(ssl_);
232 }
233
set_server_ssl(SSL * ssl)234 void channel::set_server_ssl(SSL * ssl)
235 {
236 ssl_ = ssl;
237
238 SSL_set_accept_state(ssl_);
239 }
240 #endif // YAMI4_WITH_OPEN_SSL
241
connect()242 core::result channel::connect()
243 {
244 core::result res;
245
246 const char tcp_prefix[] = "tcp://";
247 const std::size_t tcp_prefix_size = sizeof(tcp_prefix) - 1;
248
249 const char udp_prefix[] = "udp://";
250 const std::size_t udp_prefix_size = sizeof(udp_prefix) - 1;
251
252 const char unix_prefix[] = "unix://";
253 const std::size_t unix_prefix_size = sizeof(unix_prefix) - 1;
254
255 const char file_prefix[] = "file://";
256 const std::size_t file_prefix_size = sizeof(file_prefix) - 1;
257
258 #ifdef YAMI4_WITH_QNX
259 const char qnx_prefix[] = "qnx://";
260 const std::size_t qnx_prefix_size = sizeof(qnx_prefix) - 1;
261 #endif // YAMI4_WITH_QNX
262
263 #ifdef YAMI4_WITH_OPEN_SSL
264 const char tcps_prefix[] = "tcps://";
265 const std::size_t tcps_prefix_size = sizeof(tcps_prefix) - 1;
266 #endif // YAMI4_WITH_OPEN_SSL
267
268 if (std::strncmp(tcp_prefix, target_, tcp_prefix_size) == 0)
269 {
270 const char * tcp_address = target_ + tcp_prefix_size;
271 res = connect_to_tcp(tcp_address);
272 }
273 else if (std::strncmp(udp_prefix, target_, udp_prefix_size) == 0)
274 {
275 const char * udp_address = target_ + udp_prefix_size;
276 res = connect_to_udp(udp_address);
277 }
278 else if (std::strncmp(unix_prefix, target_, unix_prefix_size) == 0)
279 {
280 const char * path = target_ + unix_prefix_size;
281 res = connect_to_unix(path);
282 }
283 else if (std::strncmp(file_prefix, target_, file_prefix_size) == 0)
284 {
285 const char * file_name = target_ + file_prefix_size;
286 res = connect_to_file(file_name);
287 }
288 #ifdef YAMI4_WITH_QNX
289 else if (std::strncmp(qnx_prefix, target_, qnx_prefix_size) == 0)
290 {
291 const char * qnx_name = target_ + qnx_prefix_size;
292 res = connect_to_qnx(qnx_name);
293 }
294 #endif // YAMI4_WITH_QNX
295 #ifdef YAMI4_WITH_OPEN_SSL
296 else if (std::strncmp(tcps_prefix, target_, tcps_prefix_size) == 0)
297 {
298 // note: connect_to_tcp() is used to create SSL connections,
299 // there are no special actions to be taken other than
300 // setting up different protocol value
301
302 const char * tcp_address = target_ + tcps_prefix_size;
303 res = connect_to_tcp(tcp_address);
304
305 if (res == core::ok)
306 {
307 protocol_ = proto_tcps;
308 }
309 }
310 #endif // YAMI4_WITH_OPEN_SSL
311 else
312 {
313 res = core::bad_protocol;
314 }
315
316 return res;
317 }
318
get_frame_size() const319 std::size_t channel::get_frame_size() const
320 {
321 return preferred_frame_size_;
322 }
323
post(std::size_t priority,char ** buffers,const std::size_t * buffer_sizes,std::size_t number_of_frames,bool & first_frame,core::message_progress_function progress_callback,void * progress_hint)324 core::result channel::post(std::size_t priority,
325 char * * buffers, const std::size_t * buffer_sizes,
326 std::size_t number_of_frames,
327 bool & first_frame,
328 core::message_progress_function progress_callback,
329 void * progress_hint)
330 {
331 core::result res;
332
333 if (mode_ != operational)
334 {
335 res = core::bad_state;
336 }
337 else
338 {
339 // this function inserts new set of nodes to the outgoing queue
340 // based on the frame descriptions given in parameters
341
342 first_frame = first_outgoing_frame_ == NULL;
343
344 std::size_t total_byte_count = 0;
345 for (std::size_t i = 0; i != number_of_frames; ++i)
346 {
347 total_byte_count += buffer_sizes[i];
348 }
349
350 // prepare the sublist of new nodes
351
352 res = core::ok;
353 std::size_t byte_count = 0;
354 outgoing_frame * first_new_frame = NULL;
355 outgoing_frame * last_new_frame = NULL;
356 for (std::size_t i = 0; i != number_of_frames; ++i)
357 {
358 outgoing_frame * frame = static_cast<outgoing_frame *>(
359 alloc_->allocate(sizeof(outgoing_frame)));
360
361 if (frame != NULL)
362 {
363 if (first_new_frame == NULL)
364 {
365 first_new_frame = frame;
366 }
367
368 byte_count += buffer_sizes[i];
369
370 frame->data = buffers[i];
371 frame->size = buffer_sizes[i];
372 frame->progress_callback = progress_callback;
373 frame->progress_hint = progress_hint;
374 frame->byte_count = byte_count;
375 frame->total_byte_count = total_byte_count;
376 frame->priority = priority;
377 frame->close_flag = false;
378 frame->next = NULL;
379
380 if (last_new_frame != NULL)
381 {
382 last_new_frame->next = frame;
383 }
384
385 last_new_frame = frame;
386 }
387 else
388 {
389 // rollback what was already allocated
390 while (first_new_frame != NULL)
391 {
392 outgoing_frame * next = first_new_frame->next;
393 alloc_->deallocate(first_new_frame);
394 first_new_frame = next;
395 }
396
397 res = core::no_memory;
398 break;
399 }
400 }
401
402 if (res == core::ok)
403 {
404 outgoing_frame * * insertion_point =
405 find_outgoing_insertion_point(priority);
406
407 // continuation of the list after newly added nodes
408 outgoing_frame * following_frame = *insertion_point;
409
410 // insert new nodes
411
412 last_new_frame->next = following_frame;
413 *insertion_point = first_new_frame;
414
415 if (following_frame == NULL)
416 {
417 // the nodes were added at the end of the list
418 last_outgoing_frame_ = last_new_frame;
419 }
420
421 pending_outgoing_bytes_ += total_byte_count;
422 }
423 }
424
425 return res;
426 }
427
do_some_work(io_direction direction,bool & close_me)428 core::result channel::do_some_work(io_direction direction, bool & close_me)
429 {
430 core::result res = core::ok;
431
432 if (mode_ == operational && (direction == input || direction == inout))
433 {
434 res = do_some_input();
435
436 if (res == core::io_error ||
437 res == core::channel_closed ||
438 res == core::unexpected_value)
439 {
440 mode_ = hard_close;
441 close_me = true;
442 }
443 }
444
445 // output is allowed even in the soft closing state
446 // (pending messages need to be pushed out)
447 if (res == core::ok &&
448 (mode_ == operational || mode_ == soft_close) &&
449 (direction == output || direction == inout))
450 {
451 res = do_some_output(close_me);
452 }
453
454 return res;
455 }
456
post_close(std::size_t priority,bool & close_me)457 core::result channel::post_close(std::size_t priority, bool & close_me)
458 {
459 core::result res;
460
461 if (mode_ != operational)
462 {
463 // second close on the same channel is not allowed
464 res = core::bad_state;
465 }
466 else
467 {
468 if (first_outgoing_frame_ == NULL)
469 {
470 // the list is empty, immediately close the connection
471
472 mode_ = hard_close;
473 close_me = true;
474 res = core::ok;
475 }
476 else
477 {
478 outgoing_frame * poison_pill = static_cast<outgoing_frame *>(
479 alloc_->allocate(sizeof(outgoing_frame)));
480
481 if (poison_pill != NULL)
482 {
483 poison_pill->data = NULL;
484 poison_pill->size = 0;
485 poison_pill->close_flag = true;
486 poison_pill->next = NULL;
487
488 outgoing_frame * * insertion_point =
489 find_outgoing_insertion_point(priority);
490
491 // continuation of the list after poison pill
492 outgoing_frame * following_frame = *insertion_point;
493
494 // insert the poison pill
495 // and clean the remaining part of the list
496
497 *insertion_point = poison_pill;
498 last_outgoing_frame_ = poison_pill;
499
500 mode_ = soft_close;
501
502 // at this point the list is terminated
503 // just after the poison pill,
504 // which means that the following part of the list
505 // is not visible to the outside observers
506 // - it is safe to execute user callbacks in this state
507
508 clean_outgoing_frames(following_frame);
509
510 res = core::ok;
511 }
512 else
513 {
514 res = core::no_memory;
515 }
516 }
517
518 clean_incoming_messages();
519 }
520
521 return res;
522 }
523
move_target()524 const char * channel::move_target()
525 {
526 // destructive move to the caller,
527 // intended to call just before clean if the target information
528 // is still needed
529
530 const char * res = target_;
531 target_ = NULL;
532 return res;
533 }
534
get_io_descriptor(io_descriptor_type & fd,io_direction & direction) const535 void channel::get_io_descriptor(
536 io_descriptor_type & fd, io_direction & direction) const
537 {
538 fd = fd_;
539
540 // if there are no outgoing frames, then the channel should not
541 // advertise itself as being interested in doing output operation
542 // it can still be interested in doing input
543
544 switch (direction_)
545 {
546 case none:
547 // the channel should never be in this state
548 fatal_failure(__FILE__, __LINE__);
549 break;
550 case input:
551 direction = input;
552 break;
553 case output:
554 if (first_outgoing_frame_ != NULL)
555 {
556 direction = output;
557 }
558 else
559 {
560 direction = none;
561 }
562 break;
563 case inout:
564 if (first_outgoing_frame_ != NULL)
565 {
566 direction = inout;
567 }
568 else
569 {
570 direction = input;
571 }
572 break;
573 }
574 }
575
find_outgoing_insertion_point(std::size_t priority)576 outgoing_frame * * channel::find_outgoing_insertion_point(
577 std::size_t priority)
578 {
579 // find the proper place for new node(s), according to priorities
580 // the first frame is left in place independent of priorities
581
582 outgoing_frame * * insertion_point;
583 if (first_outgoing_frame_ == NULL)
584 {
585 // list is empty
586 insertion_point = &first_outgoing_frame_;
587 }
588 else
589 {
590 // the list contains at least one frame
591
592 // check for the most common scenario,
593 // when the priority of the new node(s) is not higher
594 // than the priorities of existing nodes
595
596 if (priority <= last_outgoing_frame_->priority)
597 {
598 // append at the end
599 insertion_point = &(last_outgoing_frame_->next);
600 }
601 else
602 {
603 // find the insertion point according to priorities
604 // but leave the first frame intact
605
606 insertion_point = &(first_outgoing_frame_->next);
607
608 while (*insertion_point != NULL &&
609 (*insertion_point)->priority >= priority)
610 {
611 insertion_point = &((*insertion_point)->next);
612 }
613 }
614 }
615
616 return insertion_point;
617 }
618
clean_outgoing_frames(outgoing_frame * frame)619 void channel::clean_outgoing_frames(outgoing_frame * frame)
620 {
621 // this function is also responsible for notifying all
622 // abandoned messages about their cancellation
623 // the notification (callback to the user code) is always safe,
624 // becuase this function is called in a state when the outgoing
625 // frames starting from frame are not visible to outside observers
626
627 while (frame != NULL)
628 {
629 pending_outgoing_bytes_ -= frame->size;
630
631 if (frame->close_flag == false &&
632 (frame->byte_count == frame->total_byte_count))
633 {
634 // this is the last outgoing frame for the given message
635
636 core::message_progress_function progress_callback =
637 frame->progress_callback;
638 void * progress_hint = frame->progress_hint;
639
640 // notify the message that it was cancelled
641
642 if (progress_callback != NULL)
643 {
644 notify_cancellation(progress_callback, progress_hint);
645 }
646 }
647
648 const char * data = frame->data;
649 if (data != NULL)
650 {
651 alloc_->deallocate(data);
652 }
653
654 outgoing_frame * next = frame->next;
655 alloc_->deallocate(frame);
656 frame = next;
657 }
658 }
659
notify_cancellation(core::message_progress_function progress_callback,void * progress_hint)660 void channel::notify_cancellation(
661 core::message_progress_function progress_callback,
662 void * progress_hint)
663 {
664 mtx_->unlock();
665
666 try
667 {
668 // zero sizes in callback indicate cancellation
669 progress_callback(progress_hint, 0, 0);
670 }
671 catch (...)
672 {
673 // ignore exceptions from user callbacks
674 }
675
676 // revert the locked state
677 mtx_->lock();
678 }
679
parse_incoming_frame_header()680 void channel::parse_incoming_frame_header()
681 {
682 // note: the buffer has exactly 4 words,
683 // there is no possibility for error here
684
685 const std::size_t num_of_buffers = 1;
686 const char * buffers[num_of_buffers];
687 std::size_t sizes[num_of_buffers];
688
689 buffers[0] = frame_head_buffer_;
690 sizes[0] = frame_head_size;
691
692 std::size_t current_buffer = 0;
693 const char * buffer_position = buffers[0];
694
695 int value;
696 core::result res = get_integer(buffers, sizes, num_of_buffers,
697 current_buffer, buffer_position, value);
698 if (res != core::ok)
699 {
700 fatal_failure(__FILE__, __LINE__);
701 }
702
703 current_message_id_ = value;
704
705 res = get_integer(buffers, sizes, num_of_buffers,
706 current_buffer, buffer_position, value);
707 if (res != core::ok)
708 {
709 fatal_failure(__FILE__, __LINE__);
710 }
711
712 if (value >= 0)
713 {
714 last_frame_ = false;
715 current_frame_number_ = value;
716 }
717 else
718 {
719 last_frame_ = true;
720 current_frame_number_ = -value;
721 }
722
723 res = get_integer(buffers, sizes, num_of_buffers,
724 current_buffer, buffer_position, value);
725 if (res != core::ok)
726 {
727 fatal_failure(__FILE__, __LINE__);
728 }
729
730 current_message_header_size_ = static_cast<std::size_t>(value);
731
732 res = get_integer(buffers, sizes, num_of_buffers,
733 current_buffer, buffer_position, value);
734 if (res != core::ok)
735 {
736 fatal_failure(__FILE__, __LINE__);
737 }
738
739 current_frame_payload_size_ = static_cast<std::size_t>(value);
740 }
741
store_incoming_frame()742 core::result channel::store_incoming_frame()
743 {
744 core::result res;
745
746 incoming_message_frame_list * list;
747
748 incoming_frame * new_frame = static_cast<incoming_frame *>(
749 alloc_->allocate(sizeof(incoming_frame)));
750 if (new_frame != NULL)
751 {
752 new_frame->frame_number = current_frame_number_;
753 new_frame->data = current_frame_payload_;
754 new_frame->size = current_frame_payload_size_;
755
756 current_frame_payload_ = NULL;
757
758 if (first_incoming_frame_list_ == NULL)
759 {
760 // no incoming frame list yet,
761 // create the new list with single frame
762
763 list = static_cast<incoming_message_frame_list *>(
764 alloc_->allocate(sizeof(incoming_message_frame_list)));
765 if (list != NULL)
766 {
767 list->message_id = current_message_id_;
768 list->header_size = current_message_header_size_;
769 list->check_completeness = last_frame_;
770 list->next_list = NULL;
771
772 new_frame->next = NULL;
773 list->first_frame = new_frame;
774 list->last_frame = new_frame;
775
776 first_incoming_frame_list_ = list;
777 last_incoming_frame_list_ = list;
778
779 res = core::ok;
780 }
781 else
782 {
783 alloc_->deallocate(new_frame->data);
784 alloc_->deallocate(new_frame);
785
786 res = core::no_memory;
787 }
788 }
789 else
790 {
791 // some lists already exist - find the list for the same message
792
793 list = first_incoming_frame_list_;
794 while (list != NULL && list->message_id != current_message_id_)
795 {
796 list = list->next_list;
797 }
798
799 if (list != NULL)
800 {
801 // found appropriate list,
802 // insert the new (current) frame in the
803 // correct location, according to its frame number
804 // note: each list has at least one frame already in it,
805 // there are no empty lists
806
807 // the optimistic and most frequent case is
808 // when the current frame
809 // should be appended to the end of the list
810
811 if (current_frame_number_ >= list->last_frame->frame_number)
812 {
813 new_frame->next = NULL;
814 list->last_frame->next = new_frame;
815 list->last_frame = new_frame;
816
817 if (last_frame_)
818 {
819 list->check_completeness = true;
820 }
821 }
822 else
823 {
824 // insert new frame in between existing frames
825
826 incoming_frame * * insertion_point = &(list->first_frame);
827
828 while ((*insertion_point)->frame_number <=
829 current_frame_number_)
830 {
831 insertion_point = &((*insertion_point)->next);
832 }
833
834 new_frame->next = *insertion_point;
835 *insertion_point = new_frame;
836
837 // use the message header size from first frame
838 // (values from other frames are ignored)
839 if (current_frame_number_ == 1)
840 {
841 list->header_size = current_message_header_size_;
842 }
843 }
844
845 res = core::ok;
846 }
847 else
848 {
849 // no appropriate list found - this is the first frame
850 // of the message that was not yet seen
851
852 // create new list with this single frame
853 // and append it after all existing lists
854
855 list = static_cast<incoming_message_frame_list *>(
856 alloc_->allocate(sizeof(incoming_message_frame_list)));
857 if (list != NULL)
858 {
859 list->message_id = current_message_id_;
860 list->header_size = current_message_header_size_;
861 list->check_completeness = last_frame_;
862 list->next_list = NULL;
863
864 new_frame->next = NULL;
865 list->first_frame = new_frame;
866 list->last_frame = new_frame;
867
868 last_incoming_frame_list_->next_list = list;
869 last_incoming_frame_list_ = list;
870
871 res = core::ok;
872 }
873 else
874 {
875 alloc_->deallocate(new_frame->data);
876 alloc_->deallocate(new_frame);
877
878 res = core::no_memory;
879 }
880 }
881 }
882 }
883 else
884 {
885 alloc_->deallocate(current_frame_payload_);
886 current_frame_payload_ = NULL;
887
888 res = core::no_memory;
889 }
890
891 if (res == core::ok && list->check_completeness)
892 {
893 // this message already contains its closing frame
894 // -> check its completeness and dispatch
895
896 res = dispatch_incoming_message(list);
897 }
898
899 return res;
900 }
901
dispatch_short_incoming_message()902 core::result channel::dispatch_short_incoming_message()
903 {
904 // the whole message (header and body) is contained in the single frame
905
906 core::result res;
907
908 if ((current_message_header_size_ >= current_frame_payload_size_) ||
909 (current_message_header_size_ % word_size != 0) ||
910 (current_frame_payload_size_ % word_size != 0))
911 {
912 alloc_->deallocate(current_frame_payload_);
913 current_frame_payload_ = NULL;
914
915 res = core::unexpected_value;
916 }
917 else
918 {
919 if (event_notification_callback_ != NULL)
920 {
921 try
922 {
923 event_notification_callback_(
924 event_notification_hint_,
925 core::message_received,
926 target_,
927 current_frame_payload_size_ + frame_head_size);
928 }
929 catch (...)
930 {
931 // ignore errors from user callback
932 }
933 }
934
935 if (incoming_message_callback_ != NULL)
936 {
937 const char * header_buffers[1];
938 header_buffers[0] = current_frame_payload_;
939
940 std::size_t header_buffer_sizes[1];
941 header_buffer_sizes[0] = current_message_header_size_;
942
943 std::size_t num_of_header_buffers = 1;
944
945 const char * body_buffers[1];
946 body_buffers[0] =
947 current_frame_payload_ + current_message_header_size_;
948
949 std::size_t body_buffer_sizes[1];
950 body_buffer_sizes[0] =
951 current_frame_payload_size_ - current_message_header_size_;
952
953 std::size_t num_of_body_buffers = 1;
954
955 mtx_->unlock();
956
957 try
958 {
959 incoming_message_callback_(
960 incoming_message_hint_,
961 target_,
962 header_buffers,
963 header_buffer_sizes,
964 num_of_header_buffers,
965 body_buffers,
966 body_buffer_sizes,
967 num_of_body_buffers);
968 }
969 catch (...)
970 {
971 // ignore exceptions from user callbacks
972 }
973
974 mtx_->lock();
975 }
976
977 // the incoming buffer is already used at this point
978
979 alloc_->deallocate(current_frame_payload_);
980 current_frame_payload_ = NULL;
981
982 res = core::ok;
983 }
984
985 return res;
986 }
987
dispatch_incoming_message(incoming_message_frame_list * list)988 core::result channel::dispatch_incoming_message(
989 incoming_message_frame_list * list)
990 {
991 core::result res;
992
993 // check sizes, completeness of the message and count frames
994
995 std::size_t previous_frame_number = 0;
996
997 std::size_t num_of_header_buffers = 0;
998 std::size_t num_of_body_buffers = 0;
999 incoming_frame * last_header_frame = NULL; // dummy initialization
1000 std::size_t header_part_of_last_header_frame = 0; // dummy initialization
1001 incoming_frame * first_body_frame = NULL;
1002 std::size_t body_offset_of_first_body_frame = 0; // dummy initialization
1003
1004 std::size_t header_size_so_far = 0;
1005 enum { header, body } state = header;
1006
1007 bool message_complete = true;
1008
1009 incoming_frame * frame = list->first_frame;
1010 while (frame != NULL)
1011 {
1012 if (frame->frame_number != previous_frame_number + 1)
1013 {
1014 // message is not yet complete (missing frames)
1015 // continue without error, wait for next opportunity
1016
1017 message_complete = false;
1018 frame = NULL;
1019 }
1020 else
1021 {
1022 if (state == header)
1023 {
1024 ++num_of_header_buffers;
1025
1026 if (header_size_so_far + frame->size >= list->header_size)
1027 {
1028 // this frame completes the message header
1029
1030 last_header_frame = frame;
1031 header_part_of_last_header_frame =
1032 list->header_size - header_size_so_far;
1033
1034 if (header_size_so_far + frame->size > list->header_size)
1035 {
1036 // this frame also contains some initial data
1037 // for the body
1038
1039 first_body_frame = frame;
1040 body_offset_of_first_body_frame =
1041 list->header_size - header_size_so_far;
1042
1043 ++num_of_body_buffers;
1044 }
1045
1046 state = body;
1047 }
1048 else
1049 {
1050 header_size_so_far += frame->size;
1051 }
1052 }
1053 else // state == body
1054 {
1055 if (first_body_frame == NULL)
1056 {
1057 first_body_frame = frame;
1058 body_offset_of_first_body_frame = 0;
1059 }
1060
1061 ++num_of_body_buffers;
1062 }
1063
1064 previous_frame_number = frame->frame_number;
1065 frame = frame->next;
1066 }
1067 }
1068
1069 if (message_complete)
1070 {
1071 const char * * header_buffers = static_cast<const char * *>(
1072 alloc_->allocate(num_of_header_buffers * sizeof(const char *)));
1073 std::size_t * header_buffer_sizes = static_cast<std::size_t *>(
1074 alloc_->allocate(num_of_header_buffers * sizeof(std::size_t)));
1075 const char * * body_buffers = static_cast<const char * *>(
1076 alloc_->allocate(num_of_body_buffers * sizeof(const char *)));
1077 std::size_t * body_buffer_sizes = static_cast<std::size_t *>(
1078 alloc_->allocate(num_of_body_buffers * sizeof(std::size_t)));
1079
1080 if (header_buffers != NULL &&
1081 header_buffer_sizes != NULL &&
1082 body_buffers != NULL &&
1083 body_buffer_sizes != NULL)
1084 {
1085 std::size_t total_bytes = 0;
1086 std::size_t i = 0;
1087 frame = list->first_frame;
1088 state = header;
1089 while (frame != NULL)
1090 {
1091 if (state == header)
1092 {
1093 header_buffers[i] = frame->data;
1094 if (frame != last_header_frame)
1095 {
1096 header_buffer_sizes[i] = frame->size;
1097
1098 ++i;
1099 }
1100 else
1101 {
1102 header_buffer_sizes[i] =
1103 header_part_of_last_header_frame;
1104
1105 state = body;
1106 i = 0;
1107
1108 if (frame == first_body_frame)
1109 {
1110 body_buffers[i] =
1111 frame->data + body_offset_of_first_body_frame;
1112 body_buffer_sizes[i] =
1113 frame->size - body_offset_of_first_body_frame;
1114
1115 ++i;
1116 }
1117 }
1118 }
1119 else // state == body
1120 {
1121 body_buffers[i] = frame->data;
1122 body_buffer_sizes[i] = frame->size;
1123
1124 ++i;
1125 }
1126
1127 total_bytes += frame->size + frame_head_size;
1128
1129 frame = frame->next;
1130 }
1131
1132 if (event_notification_callback_ != NULL)
1133 {
1134 try
1135 {
1136 event_notification_callback_(
1137 event_notification_hint_,
1138 core::message_received,
1139 target_, total_bytes);
1140 }
1141 catch (...)
1142 {
1143 // ignore errors from user callback
1144 }
1145 }
1146
1147 if (incoming_message_callback_ != NULL)
1148 {
1149 mtx_->unlock();
1150
1151 try
1152 {
1153 incoming_message_callback_(
1154 incoming_message_hint_,
1155 target_,
1156 header_buffers,
1157 header_buffer_sizes,
1158 num_of_header_buffers,
1159 body_buffers,
1160 body_buffer_sizes,
1161 num_of_body_buffers);
1162 }
1163 catch (...)
1164 {
1165 // ignore exceptions from user callbacks
1166 }
1167
1168 mtx_->lock();
1169 }
1170
1171 // the incoming frames are no longer needed
1172
1173 // detach this list from the rest
1174
1175 incoming_message_frame_list * previous_list = NULL;
1176 incoming_message_frame_list * * removal_point =
1177 &first_incoming_frame_list_;
1178 while (*removal_point != list)
1179 {
1180 previous_list = *removal_point;
1181 removal_point = &((*removal_point)->next_list);
1182 }
1183
1184 if (*removal_point == last_incoming_frame_list_)
1185 {
1186 // it is the last list of frames which is removed,
1187 // update the pointer to the rightmost list
1188
1189 last_incoming_frame_list_ = previous_list;
1190 }
1191
1192 *removal_point = (*removal_point)->next_list;
1193
1194 clean_incoming_frames(list);
1195 alloc_->deallocate(list);
1196
1197 res = core::ok;
1198 }
1199 else
1200 {
1201 res = core::no_memory;
1202 }
1203
1204 if (header_buffers != NULL)
1205 {
1206 alloc_->deallocate(header_buffers);
1207 }
1208 if (header_buffer_sizes != NULL)
1209 {
1210 alloc_->deallocate(header_buffer_sizes);
1211 }
1212 if (body_buffers != NULL)
1213 {
1214 alloc_->deallocate(body_buffers);
1215 }
1216 if (body_buffer_sizes != NULL)
1217 {
1218 alloc_->deallocate(body_buffer_sizes);
1219 }
1220 }
1221 else
1222 {
1223 // do not report errors on incomplete messages
1224 // wait for the next opportunity to test for completeness
1225
1226 res = core::ok;
1227 }
1228
1229 return res;
1230 }
1231
clean_incoming_messages()1232 void channel::clean_incoming_messages()
1233 {
1234 if (current_frame_payload_ != NULL)
1235 {
1236 alloc_->deallocate(current_frame_payload_);
1237 current_frame_payload_ = NULL;
1238 }
1239
1240 // clean all incoming lists
1241 while (first_incoming_frame_list_ != NULL)
1242 {
1243 clean_incoming_frames(first_incoming_frame_list_);
1244
1245 incoming_message_frame_list * next_list =
1246 first_incoming_frame_list_->next_list;
1247 alloc_->deallocate(first_incoming_frame_list_);
1248 first_incoming_frame_list_ = next_list;
1249 }
1250
1251 last_incoming_frame_list_ = NULL;
1252 }
1253
clean_incoming_frames(incoming_message_frame_list * list)1254 void channel::clean_incoming_frames(incoming_message_frame_list * list)
1255 {
1256 incoming_frame * frame = list->first_frame;
1257 while (frame != NULL)
1258 {
1259 alloc_->deallocate(frame->data);
1260
1261 incoming_frame * next = frame->next;
1262 alloc_->deallocate(frame);
1263 frame = next;
1264 }
1265 }
1266
get_first_outgoing_frame() const1267 outgoing_frame * channel::get_first_outgoing_frame() const
1268 {
1269 return first_outgoing_frame_;
1270 }
1271
get_last_outgoing_frame() const1272 outgoing_frame * channel::get_last_outgoing_frame() const
1273 {
1274 return last_outgoing_frame_;
1275 }
1276
get_first_incoming_frame_list() const1277 incoming_message_frame_list * channel::get_first_incoming_frame_list() const
1278 {
1279 return first_incoming_frame_list_;
1280 }
1281
get_last_incoming_frame_list() const1282 incoming_message_frame_list * channel::get_last_incoming_frame_list() const
1283 {
1284 return last_incoming_frame_list_;
1285 }
1286
get_incoming_state() const1287 incoming_fsm channel::get_incoming_state() const
1288 {
1289 return incoming_state_;
1290 }
1291
get_pending_outgoing_bytes() const1292 std::size_t channel::get_pending_outgoing_bytes() const
1293 {
1294 return pending_outgoing_bytes_;
1295 }
1296
process_complete_incoming_frame(const char * buffer,const std::size_t buffer_size)1297 core::result channel::process_complete_incoming_frame(
1298 const char * buffer, const std::size_t buffer_size)
1299 {
1300 core::result res;
1301
1302 // parse the header
1303
1304 if (buffer_size >= frame_head_size)
1305 {
1306 std::memcpy(frame_head_buffer_, buffer, frame_head_size);
1307 parse_incoming_frame_header();
1308
1309 if (current_frame_payload_size_ == buffer_size - frame_head_size)
1310 {
1311 // behave as if the whole payload has been already read
1312
1313 current_frame_payload_ = static_cast<char *>(
1314 alloc_->allocate(current_frame_payload_size_));
1315 if (current_frame_payload_ != NULL)
1316 {
1317 std::memcpy(current_frame_payload_,
1318 buffer + frame_head_size, current_frame_payload_size_);
1319
1320 // short messages fit entirely in a single frame,
1321 // so can be dispatched without creating
1322 // any additional data structures
1323
1324 if (current_frame_number_ == 1 && last_frame_)
1325 {
1326 res = dispatch_short_incoming_message();
1327 }
1328 else
1329 {
1330 res = store_incoming_frame();
1331 }
1332
1333 current_message_header_size_ = 0;
1334 }
1335 else
1336 {
1337 res = core::no_memory;
1338 }
1339 }
1340 else
1341 {
1342 // corrupted buffer
1343
1344 res = core::unexpected_value;
1345 }
1346 }
1347 else
1348 {
1349 // corrupted buffer
1350
1351 res = core::unexpected_value;
1352 }
1353
1354 return res;
1355 }
1356
do_some_input()1357 core::result channel::do_some_input()
1358 {
1359 core::result res;
1360 std::size_t readn;
1361
1362 if (protocol_ == proto_udp
1363 #ifdef YAMI4_WITH_QNX
1364 // added for completeness,
1365 // it is an error to attempt to read from QNX channel
1366 // (the error is reported by read_bytes)
1367 || protocol_ == proto_qnx
1368 #endif // YAMI4_WITH_QNX
1369 )
1370 {
1371 // This is a datagram protocol,
1372 // read the whole frame in a single operation.
1373
1374 res = read_bytes(datagram_buffer_, preferred_frame_size_, readn);
1375 if (res == core::ok)
1376 {
1377 res = process_complete_incoming_frame(datagram_buffer_, readn);
1378 }
1379 }
1380 else
1381 {
1382 // This is a stream protocol,
1383 // read the frame header or frame payload according
1384 // to the state machine.
1385
1386 if (incoming_state_ == read_frame_header)
1387 {
1388 res = read_bytes(frame_head_buffer_ + read_offset_,
1389 frame_head_size - read_offset_, readn);
1390 if (res == core::ok)
1391 {
1392 read_offset_ += readn;
1393
1394 if (read_offset_ == frame_head_size)
1395 {
1396 // the whole frame header has been read
1397
1398 parse_incoming_frame_header();
1399
1400 if (current_frame_payload_size_ != 0 &&
1401 current_frame_payload_size_ % 4 == 0)
1402 {
1403 current_frame_payload_ = static_cast<char *>(
1404 alloc_->allocate(current_frame_payload_size_));
1405 if (current_frame_payload_ != NULL)
1406 {
1407 read_offset_ = 0;
1408 incoming_state_ = read_frame_payload;
1409 }
1410 else
1411 {
1412 res = core::no_memory;
1413 }
1414 }
1415 else
1416 {
1417 res = core::unexpected_value;
1418 }
1419 }
1420 }
1421 }
1422 else // incoming_state_ == read_frame_payload
1423 {
1424 res = read_bytes(current_frame_payload_ + read_offset_,
1425 current_frame_payload_size_ - read_offset_, readn);
1426 if (res == core::ok)
1427 {
1428 read_offset_ += readn;
1429
1430 if (read_offset_ == current_frame_payload_size_)
1431 {
1432 // the whole payload has been read
1433
1434 // short messages fit entirely in a single frame,
1435 // so can be dispatched without creating
1436 // any additional data structures
1437
1438 if (current_frame_number_ == 1 && last_frame_)
1439 {
1440 res = dispatch_short_incoming_message();
1441 }
1442 else
1443 {
1444 res = store_incoming_frame();
1445 }
1446
1447 current_message_header_size_ = 0;
1448 read_offset_ = 0;
1449 incoming_state_ = read_frame_header;
1450 }
1451 }
1452 }
1453
1454 if (res == core::io_error)
1455 {
1456 // general I/O error
1457 // abandon all incoming frames, because they all
1458 // belong to messages that are still being read
1459 // and there is no hope of resynchronizing the stream
1460 // to get them right again
1461
1462 clean_incoming_messages();
1463 }
1464 }
1465
1466 return res;
1467 }
1468
1469 // common helper for stream and datagram channels,
1470 // called when the complete frame was pushed out
notify_progress_and_consume_frame_list()1471 void channel::notify_progress_and_consume_frame_list()
1472 {
1473 pending_outgoing_bytes_ -= first_outgoing_frame_->size;
1474
1475 const std::size_t sent_bytes =
1476 first_outgoing_frame_->byte_count;
1477 const std::size_t total_message_size =
1478 first_outgoing_frame_->total_byte_count;
1479
1480 if (event_notification_callback_ != NULL)
1481 {
1482 if (sent_bytes == total_message_size)
1483 {
1484 try
1485 {
1486 event_notification_callback_(
1487 event_notification_hint_,
1488 core::message_sent,
1489 target_, total_message_size);
1490 }
1491 catch (...)
1492 {
1493 // ignore errors from user callback
1494 }
1495 }
1496 }
1497
1498 core::message_progress_function progress_callback =
1499 first_outgoing_frame_->progress_callback;
1500 void * progress_hint =
1501 first_outgoing_frame_->progress_hint;
1502
1503 if (progress_callback != NULL)
1504 {
1505 mtx_->unlock();
1506
1507 try
1508 {
1509 progress_callback(progress_hint,
1510 sent_bytes, total_message_size);
1511 }
1512 catch (...)
1513 {
1514 // ignore exceptions from user callbacks
1515 }
1516
1517 mtx_->lock();
1518 }
1519
1520 outgoing_frame * next = first_outgoing_frame_->next;
1521
1522 alloc_->deallocate(first_outgoing_frame_->data);
1523 alloc_->deallocate(first_outgoing_frame_);
1524 first_outgoing_frame_ = next;
1525 output_frame_offset_ = 0;
1526 if (next == NULL)
1527 {
1528 // the list was exhausted
1529 last_outgoing_frame_ = NULL;
1530 }
1531 }
1532
do_some_output(bool & close_me)1533 core::result channel::do_some_output(bool & close_me)
1534 {
1535 if (first_outgoing_frame_ == NULL)
1536 {
1537 fatal_failure(__FILE__, __LINE__);
1538 }
1539
1540 core::result res;
1541
1542 // close the connection if the poison pill happens to be
1543 // at the beginning of the queue
1544
1545 if (first_outgoing_frame_->close_flag)
1546 {
1547 // there are no more frames in the queue
1548 if (first_outgoing_frame_->next != NULL)
1549 {
1550 fatal_failure(__FILE__, __LINE__);
1551 }
1552
1553 mode_ = hard_close;
1554 close_me = true;
1555 res = core::ok;
1556 }
1557 else
1558 {
1559 if (protocol_ == proto_udp
1560 #ifdef YAMI4_WITH_QNX
1561 || protocol_ == proto_qnx
1562 #endif // YAMI4_WITH_QNX
1563 )
1564 {
1565 // This is a datagram (UDP or QNX) channel,
1566 // write the whole frame in a single operation.
1567
1568 std::size_t written;
1569 res = write_bytes(
1570 first_outgoing_frame_->data + output_frame_offset_,
1571 first_outgoing_frame_->size - output_frame_offset_,
1572 written);
1573 if (res == core::ok)
1574 {
1575 // the whole frame has been pushed
1576
1577 notify_progress_and_consume_frame_list();
1578 }
1579 }
1580 else
1581 {
1582 // This is a stream channel.
1583
1584 std::size_t written;
1585 res = write_bytes(
1586 first_outgoing_frame_->data + output_frame_offset_,
1587 first_outgoing_frame_->size - output_frame_offset_,
1588 written);
1589 if (res == core::ok)
1590 {
1591 output_frame_offset_ += written;
1592
1593 if (output_frame_offset_ == first_outgoing_frame_->size)
1594 {
1595 // the whole frame has been already pushed
1596
1597 notify_progress_and_consume_frame_list();
1598 }
1599 }
1600 }
1601
1602 if (res != core::ok)
1603 {
1604 // general I/O error
1605 // request close
1606 // (all frames in the output queue will be cleaned up
1607 // and progress will be notified as part of the channel closing)
1608
1609 mode_ = hard_close;
1610 close_me = true;
1611 }
1612 }
1613
1614 return res;
1615 }
1616