1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16
17 #include "channel_group.h"
18 #include "allocator.h"
19 #include "channel.h"
20 #include "channel_holder.h"
21 #include "listener.h"
22 #include "options.h"
23 #include "outgoing_frame.h"
24 #include "parameters.h"
25 #include "serialization.h"
26 #include <cstring>
27
28 using namespace yami;
29 using namespace details;
30
31 namespace // unnamed namespace
32 {
33
34 const std::size_t frame_header_size = 16; // four words
35
36 // helper for serialization, deallocates (deeply) the array of frames
deallocate_array_of_frames(allocator & alloc,char ** buffers,std::size_t * buffer_sizes,std::size_t number_of_frames)37 void deallocate_array_of_frames(allocator & alloc,
38 char * * buffers, std::size_t * buffer_sizes,
39 std::size_t number_of_frames)
40 {
41 if (buffers != NULL)
42 {
43 for (std::size_t i = 0; i != number_of_frames; ++i)
44 {
45 if (buffers[i] != NULL)
46 {
47 alloc.deallocate(buffers[i]);
48 }
49 }
50
51 alloc.deallocate(buffers);
52 }
53
54 if (buffer_sizes != NULL)
55 {
56 alloc.deallocate(buffer_sizes);
57 }
58 }
59
60 // helper for serialization, prepares the array of outgoing frames
61 // with properly allocated data segments
prepare_array_of_frames(allocator & alloc,std::size_t message_id,std::size_t header_total_buffer_size,std::size_t body_total_buffer_size,std::size_t preferred_frame_size,std::size_t & index_of_border_frame,std::size_t & body_offset_in_border_frame,std::size_t & size_of_last_frame,std::size_t & number_of_frames,char ** & buffers,std::size_t * & buffer_sizes)62 core::result prepare_array_of_frames(allocator & alloc,
63 std::size_t message_id,
64 std::size_t header_total_buffer_size,
65 std::size_t body_total_buffer_size,
66 std::size_t preferred_frame_size,
67 std::size_t & index_of_border_frame,
68 std::size_t & body_offset_in_border_frame,
69 std::size_t & size_of_last_frame,
70 std::size_t & number_of_frames,
71 char * * & buffers, std::size_t * & buffer_sizes)
72 {
73 core::result res;
74
75 const std::size_t frame_payload_size =
76 preferred_frame_size - frame_header_size;
77
78 const std::size_t total_buffer_size =
79 header_total_buffer_size + body_total_buffer_size;
80
81 // this is the index of frame where the first byte of body data is located
82 index_of_border_frame = header_total_buffer_size / frame_payload_size;
83
84 body_offset_in_border_frame =
85 header_total_buffer_size % frame_payload_size + frame_header_size;
86
87 number_of_frames =
88 (total_buffer_size + frame_payload_size - 1) / frame_payload_size;
89
90 size_of_last_frame =
91 (total_buffer_size - 1) % frame_payload_size + 1 + frame_header_size;
92
93 // allocate the array of buffers
94
95 buffers = static_cast<char * *>(
96 alloc.allocate(number_of_frames * sizeof(char *)));
97 if (buffers != NULL)
98 {
99 // clear the pointers
100 for (std::size_t i = 0; i != number_of_frames; ++i)
101 {
102 buffers[i] = NULL;
103 }
104 }
105
106 buffer_sizes = static_cast<std::size_t *>(
107 alloc.allocate(number_of_frames * sizeof(std::size_t)));
108 if (buffers != NULL && buffer_sizes != NULL)
109 {
110
111 // allocate the data segment for each outgoing frame
112
113 res = core::ok;
114 for (std::size_t i = 0; res == core::ok && i != number_of_frames; ++i)
115 {
116 std::size_t size_of_this_frame;
117
118 if (i != number_of_frames - 1)
119 {
120 // not the last frame, ask for full preferred size
121
122 size_of_this_frame = preferred_frame_size;
123 }
124 else
125 {
126 // last frame, possibly shorter
127
128 size_of_this_frame = size_of_last_frame;
129 }
130
131 buffer_sizes[i] = size_of_this_frame;
132 buffers[i] = static_cast<char *>(
133 alloc.allocate(size_of_this_frame));
134 if (buffers[i] != NULL)
135 {
136 // fill the frame header with proper information
137
138 // frame number start with 1
139 int frame_number = i + 1;
140
141 if (i == number_of_frames - 1)
142 {
143 // this is the last frame in the whole message
144 // use negative frame number to mark this
145
146 frame_number *= -1;
147 }
148
149 fill_outgoing_frame_header(buffers[i],
150 message_id,
151 frame_number,
152 header_total_buffer_size,
153 size_of_this_frame - frame_header_size); // frame payload
154 }
155 else
156 {
157 // cannot allocate data segment
158
159 res = core::no_memory;
160 }
161 }
162 }
163 else
164 {
165 res = core::no_memory;
166 }
167
168 return res;
169 }
170
171 // helper for preparing array of buffer pointers and buffer sizes
172 // that will be used for serialization
173 // they are offset to take into account space for frame headers
prepare_array_of_serialize_buffer_pointers(allocator & alloc,char ** buffers,std::size_t * buffer_sizes,std::size_t number_of_frames,char ** & serialize_buffers,std::size_t * & serialize_buffer_sizes)174 core::result prepare_array_of_serialize_buffer_pointers(
175 allocator & alloc,
176 char * * buffers, std::size_t * buffer_sizes,
177 std::size_t number_of_frames,
178 char * * & serialize_buffers, std::size_t * & serialize_buffer_sizes)
179 {
180 core::result res = core::ok;
181
182 serialize_buffers = static_cast<char * *>(
183 alloc.allocate(number_of_frames * sizeof(char *)));
184 if (serialize_buffers != NULL)
185 {
186 for (std::size_t i = 0; i != number_of_frames; ++i)
187 {
188 // the serialization buffer starts right after the frame header
189 serialize_buffers[i] = buffers[i] + frame_header_size;
190 }
191 }
192 else
193 {
194 res = core::no_memory;
195 }
196
197 if (res == core::ok)
198 {
199 serialize_buffer_sizes = static_cast<std::size_t *>(
200 alloc.allocate(number_of_frames * sizeof(std::size_t)));
201 if (serialize_buffer_sizes != NULL)
202 {
203 for (std::size_t i = 0; i != number_of_frames; ++i)
204 {
205 serialize_buffer_sizes[i] =
206 buffer_sizes[i] - frame_header_size;
207 }
208 }
209 else
210 {
211 res = core::no_memory;
212 }
213 }
214
215 return res;
216 }
217
218 // helper function for serializing the new message and injecting the
219 // output frames into the output queue of the given channel
serialize_and_post(allocator & alloc,channel & ch,std::size_t message_id,std::size_t priority,const core::serializable & message_header,const core::serializable & message_body,bool & first_frame,core::message_progress_function progress_callback,void * progress_hint)220 core::result serialize_and_post(allocator & alloc,
221 channel & ch, std::size_t message_id, std::size_t priority,
222 const core::serializable & message_header,
223 const core::serializable & message_body,
224 bool & first_frame,
225 core::message_progress_function progress_callback,
226 void * progress_hint)
227 {
228 // note:
229 // There are two arrays in use:
230 // 1. the array of pointers to buffers that will be used as complete
231 // frames and will be injected to the outgoing queue
232 // of the given channel - this array owns the buffers
233 // 2. the array of pointers to buffers that are used for serialization
234 // these pointers are offset with regard to those in array 1. to
235 // take into account the necessary space for frame headers
236 // this array does not own the actual buffers
237
238 core::result res;
239
240 const std::size_t preferred_frame_size = ch.get_frame_size();
241
242 std::size_t header_total_buffer_size;
243 std::size_t body_total_buffer_size;
244
245 res = message_header.get_serialize_buffer_size(header_total_buffer_size);
246 if (res == core::ok)
247 {
248 res = message_body.get_serialize_buffer_size(body_total_buffer_size);
249 }
250
251 // prepare the array of buffers for all outgoing frames
252
253 std::size_t number_of_frames;
254 std::size_t index_of_border_frame;
255 std::size_t body_offset_in_border_frame;
256 std::size_t size_of_last_frame;
257 char * * buffers = NULL;
258 std::size_t * buffer_sizes = NULL;
259 if (res == core::ok)
260 {
261 res = prepare_array_of_frames(alloc, message_id,
262 header_total_buffer_size, body_total_buffer_size,
263 preferred_frame_size,
264 index_of_border_frame, body_offset_in_border_frame,
265 size_of_last_frame, number_of_frames,
266 buffers, buffer_sizes);
267 }
268
269 // prepare the array of pointers for serialization
270
271 char * * serialize_buffers = NULL;
272 std::size_t * serialize_buffer_sizes = NULL;
273 if (res == core::ok)
274 {
275 res = prepare_array_of_serialize_buffer_pointers(alloc,
276 buffers, buffer_sizes, number_of_frames,
277 serialize_buffers, serialize_buffer_sizes);
278 }
279
280 // serialize the message header
281 if (res == core::ok)
282 {
283 // note: it is allowed to provide more buffers than are actually
284 // needed for serialization
285
286 res = message_header.serialize(
287 serialize_buffers, serialize_buffer_sizes, number_of_frames);
288 }
289
290 // serialize the message body
291 if (res == core::ok)
292 {
293 // the buffer pointer that will be used as the first one
294 // for serializing the message body needs to be ammended
295 // in order to take into account the tail of message header data
296
297 serialize_buffers[index_of_border_frame] =
298 buffers[index_of_border_frame] + body_offset_in_border_frame;
299 serialize_buffer_sizes[index_of_border_frame] =
300 buffer_sizes[index_of_border_frame] - body_offset_in_border_frame;
301
302 res = message_body.serialize(
303 serialize_buffers + index_of_border_frame,
304 serialize_buffer_sizes + index_of_border_frame,
305 number_of_frames - index_of_border_frame);
306 }
307
308 // post new outgoing frames to the given channel
309
310 if (res == core::ok)
311 {
312 res = ch.post(priority,
313 buffers, buffer_sizes, number_of_frames,
314 first_frame,
315 progress_callback, progress_hint);
316 }
317
318 // clean all helper buffers
319 if (serialize_buffers != NULL)
320 {
321 alloc.deallocate(serialize_buffers);
322 }
323
324 if (serialize_buffer_sizes != NULL)
325 {
326 alloc.deallocate(serialize_buffer_sizes);
327 }
328
329 if (res == core::ok)
330 {
331 // actual buffers were transferred to the channel,
332 // do not deallocate them
333 alloc.deallocate(buffers);
334 alloc.deallocate(buffer_sizes);
335 }
336 else
337 {
338 // buffers were not transferred or there was some other error
339 // -> deallocate deeply the whole structure
340 deallocate_array_of_frames(alloc,
341 buffers, buffer_sizes, number_of_frames);
342 }
343
344 return res;
345 }
346
347 } // unnamed namespace
348
init(allocator & alloc,const core::parameters * configuration_options,core::incoming_message_dispatch_function dispatch_callback,void * dispatch_hint,core::closed_connection_function disconnection_hook,void * disconnection_hook_hint)349 core::result channel_group::init(allocator & alloc,
350 const core::parameters * configuration_options,
351 core::incoming_message_dispatch_function dispatch_callback,
352 void * dispatch_hint,
353 core::closed_connection_function disconnection_hook,
354 void * disconnection_hook_hint)
355 {
356 channel_holders_ = NULL;
357 channels_num_ = 0;
358 shadow_channels_ = NULL;
359 shadow_channels_num_ = 0;
360 first_listener_ = NULL;
361 #ifdef YAMI4_WITH_QNX
362 qnx_listener_channel_id_ = 0;
363 #endif // YAMI4_WITH_QNX
364 alloc_ = &alloc;
365 closing_ = false;
366 last_message_id_ = 0;
367 incoming_message_callback_ = dispatch_callback;
368 incoming_message_hint_ = dispatch_hint;
369 disconnection_hook_ = disconnection_hook;
370 disconnection_hook_hint_ = disconnection_hook_hint;
371 event_notification_callback_ = NULL;
372 io_error_callback_ = NULL;
373
374 configuration_options_.init(configuration_options);
375 mtx_.init();
376
377 core::result res = selector_.init(alloc);
378
379 #ifdef YAMI4_WITH_OPEN_SSL
380 ssl_ctx_ = NULL;
381 if (res == core::ok)
382 {
383 // the SSL context is always created;
384 // but the incoming SSL connections will be possible
385 // only with proper certificate
386
387 (void)SSL_library_init();
388
389 ssl_ctx_ = SSL_CTX_new(SSLv23_method());
390 if (ssl_ctx_ != NULL)
391 {
392 SSL_CTX_set_verify(ssl_ctx_, SSL_VERIFY_NONE, NULL);
393
394 if (configuration_options_.ssl_certificate_file[0] != '\0' &&
395 configuration_options_.ssl_private_key_file[0] != '\0')
396 {
397 int ssl_ret1 = SSL_CTX_use_certificate_file(ssl_ctx_,
398 configuration_options_.ssl_certificate_file,
399 SSL_FILETYPE_PEM);
400 int ssl_ret2 = SSL_CTX_use_PrivateKey_file(ssl_ctx_,
401 configuration_options_.ssl_private_key_file,
402 SSL_FILETYPE_PEM);
403
404 if (ssl_ret1 != 1 || ssl_ret2 != 1)
405 {
406 SSL_CTX_free(ssl_ctx_);
407 ssl_ctx_ = NULL;
408
409 res = core::unexpected_value;
410 }
411 }
412 }
413 else
414 {
415 res = core::not_enough_space;
416 }
417 }
418 #endif // YAMI4_WITH_OPEN_SSL
419
420 return res;
421 }
422
install_event_notifications(core::event_notification_function event_notification_callback,void * event_notification_hint)423 void channel_group::install_event_notifications(
424 core::event_notification_function event_notification_callback,
425 void * event_notification_hint)
426 {
427 event_notification_callback_ = event_notification_callback;
428 event_notification_hint_ = event_notification_hint;
429 }
430
install_io_error_logger(core::io_error_function io_error_callback,void * io_error_callback_hint)431 void channel_group::install_io_error_logger(
432 core::io_error_function io_error_callback,
433 void * io_error_callback_hint)
434 {
435 io_error_callback_ = io_error_callback;
436 io_error_callback_hint_ = io_error_callback_hint;
437
438 selector_.install_io_error_logger(
439 io_error_callback, io_error_callback_hint);
440 }
441
clean(bool uses_private_area)442 void channel_group::clean(bool uses_private_area)
443 {
444 // note: if uses_private_area is false, then the memory needs to be
445 // deallocated, because it was acquired from the global free store
446 // otherwise (uses_private_area == true) all structures were
447 // created in the private storage and need not be deallocated
448
449 // in any case, all files and sockets need to be physically closed
450
451 closing_ = true;
452
453 mtx_.lock();
454
455 // clean the channels
456 for (std::size_t i = 0; i != channels_num_; ++i)
457 {
458 channel_holder & ch_holder = channel_holders_[i];
459
460 channel * ch = ch_holder.get_channel();
461 if (ch != NULL)
462 {
463 const char * target = ch->move_target();
464
465 if (uses_private_area)
466 {
467 // it is enough to close the physical connection
468 ch->close_connection();
469 }
470 else
471 {
472 // force full cleanup of the channel
473 ch->clean();
474 alloc_->deallocate(ch);
475 }
476
477 if (event_notification_callback_ != NULL)
478 {
479 try
480 {
481 event_notification_callback_(
482 event_notification_hint_,
483 core::connection_closed,
484 target, 0);
485 }
486 catch (...)
487 {
488 // ignore errors from user callback
489 }
490 }
491
492 if (disconnection_hook_ != NULL)
493 {
494 mtx_.unlock();
495
496 try
497 {
498 disconnection_hook_(disconnection_hook_hint_,
499 target, core::channel_closed);
500 }
501 catch (...)
502 {
503 // ignore errors from user callback
504 }
505
506 mtx_.lock();
507 }
508
509 if (uses_private_area == false)
510 {
511 alloc_->deallocate(target);
512 }
513 }
514 }
515
516 if (uses_private_area == false)
517 {
518 alloc_->deallocate(channel_holders_);
519 alloc_->deallocate(shadow_channels_);
520 }
521
522 // clean the list of listeners
523 while (first_listener_ != NULL)
524 {
525 listener * next = first_listener_->next;
526
527 if (event_notification_callback_ != NULL)
528 {
529 try
530 {
531 event_notification_callback_(
532 event_notification_hint_,
533 core::listener_removed,
534 first_listener_->get_target(), 0);
535 }
536 catch (...)
537 {
538 // ignore errors from user callback
539 }
540 }
541
542 if (uses_private_area)
543 {
544 // it is enough to close the listening socket
545 first_listener_->close_resource();
546 }
547 else
548 {
549 // force full cleanup
550 first_listener_->clean();
551 alloc_->deallocate(first_listener_);
552 }
553
554 first_listener_ = next;
555 }
556
557 mtx_.unlock();
558
559 // clean the selector (it has internal pipe that needs to be closed)
560
561 selector_.clean();
562 mtx_.clean();
563
564 if (event_notification_callback_ != NULL)
565 {
566 try
567 {
568 event_notification_callback_(
569 event_notification_hint_,
570 core::agent_closed,
571 NULL, 0);
572 }
573 catch (...)
574 {
575 // ignore errors from user callback
576 }
577 }
578
579 #ifdef YAMI4_WITH_OPEN_SSL
580 if (ssl_ctx_ != NULL)
581 {
582 SSL_CTX_free(ssl_ctx_);
583 ssl_ctx_ = NULL;
584 }
585 #endif // YAMI4_WITH_OPEN_SSL
586 }
587
open(const char * target,core::channel_descriptor & new_channel,bool & created_new_channel,bool do_lock,const core::parameters * overriding_options)588 core::result channel_group::open(const char * target,
589 core::channel_descriptor & new_channel,
590 bool & created_new_channel, bool do_lock,
591 const core::parameters * overriding_options)
592 {
593 // note:
594 // This function can be called by multiple threads.
595 // In order to allow for higher concurrency it can lead to
596 // multiple connections being created for the same target,
597 // although only one such connection is ultimately kept open.
598
599 core::result res;
600 if (closing_)
601 {
602 res = core::bad_state;
603 }
604 else
605 {
606 std::size_t index;
607 std::size_t sequence_number;
608 channel * existing_channel;
609
610 if (do_lock)
611 {
612 mtx_.lock();
613 }
614
615 // try to find the existing channel with the same target
616 res = find_existing_channel(
617 target, index, sequence_number, existing_channel);
618
619 if (do_lock)
620 {
621 mtx_.unlock();
622 }
623
624 if (res == core::ok)
625 {
626 new_channel = core::channel_descriptor(index, sequence_number);
627 created_new_channel = false;
628 }
629 else
630 {
631 // no channel with this target exists,
632 // initialize a new channel in some empty slot
633
634 channel * ch = static_cast<channel *>(
635 alloc_->allocate(sizeof(channel)));
636
637 if (ch != NULL)
638 {
639 // initialize channel outside of the critical section
640 // - this can lead to multiple channels being open
641
642 res = ch->init(*alloc_, mtx_,
643 configuration_options_, overriding_options,
644 target,
645 incoming_message_callback_, incoming_message_hint_,
646 event_notification_callback_, event_notification_hint_,
647 io_error_callback_, io_error_callback_hint_);
648
649 #ifdef YAMI4_WITH_QNX
650 if (ch->get_protocol() == proto_qnx)
651 {
652 ch->set_default_qnx_listening_channel_id(
653 qnx_listener_channel_id_);
654
655 ch->set_selector(selector_);
656 }
657 #endif // YAMI4_WITH_QNX
658
659 #ifdef YAMI4_WITH_OPEN_SSL
660 if (res == core::ok)
661 {
662 protocol prot = ch->get_protocol();
663 if (prot == proto_tcps)
664 {
665 SSL * ssl = SSL_new(ssl_ctx_);
666 if (ssl != NULL)
667 {
668 io_descriptor_type fd = ch->get_io_descriptor();
669
670 int ssl_ret = SSL_set_fd(ssl, fd);
671 if (ssl_ret == 1)
672 {
673 ch->set_client_ssl(ssl);
674 }
675 else
676 {
677 SSL_free(ssl);
678
679 ch->clean();
680
681 res = core::io_error;
682 }
683 }
684 else
685 {
686 ch->clean();
687
688 res = core::not_enough_space;
689 }
690 }
691 }
692 #endif // YAMI4_WITH_OPEN_SSL
693
694 if (res == core::ok)
695 {
696 bool channel_added = false;
697
698 if (do_lock)
699 {
700 mtx_.lock();
701 }
702
703 // re-check to verify that the channel
704 // is not yet in the set
705 res = find_existing_channel(
706 target, index, sequence_number, existing_channel);
707
708 if (res == core::ok)
709 {
710 // the channel with this name is in the set
711 // (probably just added by another thread)
712
713 // return the descriptor for that existing channel
714
715 new_channel = core::channel_descriptor(
716 index, sequence_number);
717 created_new_channel = false;
718 }
719 else
720 {
721 res = find_unused_channel(index, true);
722 if (res == core::ok)
723 {
724 channel_holders_[index].set(ch, sequence_number);
725 new_channel = core::channel_descriptor(
726 index, sequence_number);
727
728 created_new_channel = true;
729 channel_added = true;
730
731 if (event_notification_callback_ != NULL)
732 {
733 try
734 {
735 event_notification_callback_(
736 event_notification_hint_,
737 core::outgoing_connection_open,
738 target, 0);
739 }
740 catch (...)
741 {
742 // ignore errors from user callback
743 }
744 }
745 }
746 }
747
748 if (do_lock)
749 {
750 mtx_.unlock();
751 }
752
753 if (channel_added)
754 {
755 // notify the selector so that it can
756 // take the new channel into account
757
758 res = selector_.interrupt();
759 }
760 else
761 {
762 // channel was initialized,
763 // but not included in the set
764
765 ch->clean();
766 alloc_->deallocate(ch);
767 }
768 }
769 else
770 {
771 // channel was allocated but not initialized properly
772
773 alloc_->deallocate(ch);
774 }
775 }
776 else
777 {
778 res = core::no_memory;
779 }
780 }
781 }
782
783 return res;
784 }
785
is_open(const char * target,core::channel_descriptor & existing_channel) const786 core::result channel_group::is_open(const char * target,
787 core::channel_descriptor & existing_channel) const
788 {
789 core::result res;
790 if (closing_)
791 {
792 res = core::bad_state;
793 }
794 else
795 {
796 std::size_t index;
797 std::size_t sequence_number;
798 channel * dummy_ch;
799
800 mtx_.lock();
801
802 res = find_existing_channel(
803 target, index, sequence_number, dummy_ch);
804
805 if (res == core::ok)
806 {
807 existing_channel =
808 core::channel_descriptor(index, sequence_number);
809 }
810
811 mtx_.unlock();
812 }
813
814 return res;
815 }
816
817 // synchronized by caller
add_existing(char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::channel_descriptor & new_descriptor,channel * & new_channel)818 core::result channel_group::add_existing(
819 char * target, io_descriptor_type fd, protocol prot,
820 std::size_t preferred_frame_size,
821 core::channel_descriptor & new_descriptor,
822 channel * & new_channel)
823 {
824 core::result res;
825 if (closing_)
826 {
827 res = core::bad_state;
828 }
829 else
830 {
831 // make sure that no channel for this target already exists
832
833 std::size_t index;
834 std::size_t sequence_number;
835 channel * dummy_ch;
836
837 // try to find the existing channel with the same target
838 // (it should not exist)
839 res = find_existing_channel(target,
840 index, sequence_number, dummy_ch);
841
842 if (res == core::ok)
843 {
844 // reject the new channel, there is some I/O hazard
845 // (it is not normal for the listener to accept a channel
846 // for the target that already exists)
847 res = core::io_error;
848 }
849 else
850 {
851 // initialize a new channel in some empty slot
852
853 res = find_unused_channel(index, false);
854 if (res == core::ok)
855 {
856 channel * ch = static_cast<channel *>(
857 alloc_->allocate(sizeof(channel)));
858 if (ch != NULL)
859 {
860 ch->init(*alloc_, mtx_, configuration_options_,
861 target, fd, prot, preferred_frame_size,
862 incoming_message_callback_, incoming_message_hint_,
863 event_notification_callback_,
864 event_notification_hint_,
865 io_error_callback_, io_error_callback_hint_);
866
867 #ifdef YAMI4_WITH_QNX
868 if (ch->get_protocol() == proto_qnx)
869 {
870 ch->set_default_qnx_listening_channel_id(
871 qnx_listener_channel_id_);
872
873 ch->set_selector(selector_);
874 }
875 #endif // YAMI4_WITH_QNX
876
877 channel_holders_[index].set(ch, sequence_number);
878
879 new_descriptor =
880 core::channel_descriptor(index, sequence_number);
881
882 new_channel = ch;
883
884 if (event_notification_callback_ != NULL)
885 {
886 try
887 {
888 event_notification_callback_(
889 event_notification_hint_,
890 core::incoming_connection_open,
891 target, 0);
892 }
893 catch (...)
894 {
895 // ignore errors from user callback
896 }
897 }
898 }
899 else
900 {
901 res = core::no_memory;
902 }
903 }
904 }
905 }
906
907 return res;
908 }
909
910 #ifdef YAMI4_WITH_OPEN_SSL
911 // synchronized by caller
add_existing_ssl(char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::channel_descriptor & new_descriptor)912 core::result channel_group::add_existing_ssl(
913 char * target, io_descriptor_type fd, protocol prot,
914 std::size_t preferred_frame_size,
915 core::channel_descriptor & new_descriptor)
916 {
917 core::result res;
918
919 SSL * ssl = SSL_new(ssl_ctx_);
920 if (ssl != NULL)
921 {
922 int ssl_ret = SSL_set_fd(ssl, fd);
923 if (ssl_ret == 1)
924 {
925 channel * new_channel;
926 res = add_existing(target,
927 fd, prot, preferred_frame_size, new_descriptor, new_channel);
928 if (res == core::ok)
929 {
930 new_channel->set_server_ssl(ssl);
931 }
932 }
933 else
934 {
935 SSL_free(ssl);
936
937 res = core::io_error;
938 }
939 }
940 else
941 {
942 res = core::not_enough_space;
943 }
944
945 return res;
946 }
947 #endif // YAMI4_WITH_OPEN_SSL
948
post(core::channel_descriptor cd,const core::serializable & message_header,const core::serializable & message_body,std::size_t priority,core::message_progress_function progress_callback,void * progress_hint)949 core::result channel_group::post(
950 core::channel_descriptor cd,
951 const core::serializable & message_header,
952 const core::serializable & message_body,
953 std::size_t priority,
954 core::message_progress_function progress_callback,
955 void * progress_hint)
956 {
957 core::result res;
958 bool first_frame;
959 if (closing_)
960 {
961 res = core::bad_state;
962 }
963 else
964 {
965 res = core::no_such_index;
966
967 std::size_t index;
968 std::size_t sequence_number;
969
970 cd.get_details(index, sequence_number);
971
972 mtx_.lock();
973
974 if (index < channels_num_)
975 {
976 channel_holder & ch_holder = channel_holders_[index];
977 channel * ch = ch_holder.get_channel();
978
979 if (ch != NULL)
980 {
981 if (ch_holder.get_sequence_number() == sequence_number)
982 {
983 const std::size_t message_id = generate_message_id();
984
985 res = serialize_and_post(*alloc_,
986 *ch, message_id, priority,
987 message_header, message_body,
988 first_frame,
989 progress_callback, progress_hint);
990 }
991 }
992 }
993
994 mtx_.unlock();
995 }
996
997 if (res == core::ok && first_frame)
998 {
999 // interrupt selector so that it can perform an output operation
1000
1001 res = selector_.interrupt();
1002 }
1003
1004 return res;
1005 }
1006
post(const char * target,const core::serializable & message_header,const core::serializable & message_body,std::size_t priority,core::message_progress_function progress_callback,void * progress_hint)1007 core::result channel_group::post(
1008 const char * target,
1009 const core::serializable & message_header,
1010 const core::serializable & message_body,
1011 std::size_t priority,
1012 core::message_progress_function progress_callback,
1013 void * progress_hint)
1014 {
1015 core::result res;
1016 bool first_frame;
1017 if (closing_)
1018 {
1019 res = core::bad_state;
1020 }
1021 else
1022 {
1023 std::size_t index;
1024 std::size_t dummy_sequence_number;
1025 channel * ch;
1026
1027 mtx_.lock();
1028
1029 // try to find the existing channel with the same target
1030 res = find_existing_channel(
1031 target, index, dummy_sequence_number, ch);
1032
1033 if (res == core::ok)
1034 {
1035 const std::size_t message_id = generate_message_id();
1036
1037 res = serialize_and_post(*alloc_,
1038 *ch, message_id, priority, message_header, message_body,
1039 first_frame,
1040 progress_callback, progress_hint);
1041 }
1042
1043 mtx_.unlock();
1044 }
1045
1046 if (res == core::ok && first_frame)
1047 {
1048 // interrupt selector so that it can perform an output operation
1049
1050 res = selector_.interrupt();
1051 }
1052
1053 return res;
1054 }
1055
close(core::channel_descriptor cd,bool hard_close,std::size_t priority)1056 core::result channel_group::close(
1057 core::channel_descriptor cd, bool hard_close, std::size_t priority)
1058 {
1059 core::result res;
1060 if (closing_)
1061 {
1062 res = core::bad_state;
1063 }
1064 else
1065 {
1066 std::size_t index;
1067 std::size_t sequence_number;
1068
1069 cd.get_details(index, sequence_number);
1070
1071 // do not report errors if channel does not exist
1072
1073 mtx_.lock();
1074
1075 res = core::ok;
1076 if (index < channels_num_)
1077 {
1078 channel_holder & ch_holder = channel_holders_[index];
1079 channel * ch = ch_holder.get_channel();
1080
1081 if (ch != NULL)
1082 {
1083 if (ch_holder.get_sequence_number() == sequence_number)
1084 {
1085 if (hard_close)
1086 {
1087 res = do_hard_close(ch, index);
1088 }
1089 else
1090 {
1091 res = do_close(ch, priority, index);
1092 }
1093 }
1094 }
1095 }
1096
1097 mtx_.unlock();
1098 }
1099
1100 return res;
1101 }
1102
close(const char * target,bool hard_close,std::size_t priority)1103 core::result channel_group::close(const char * target,
1104 bool hard_close, std::size_t priority)
1105 {
1106 core::result res;
1107 if (closing_)
1108 {
1109 res = core::bad_state;
1110 }
1111 else
1112 {
1113 std::size_t index;
1114 std::size_t dummy_sequence_number;
1115 channel * ch;
1116
1117 mtx_.lock();
1118
1119 // try to find the existing channel with the same target
1120 res = find_existing_channel(target,
1121 index, dummy_sequence_number, ch);
1122
1123 if (res == core::ok)
1124 {
1125 if (hard_close)
1126 {
1127 res = do_hard_close(ch, index);
1128 }
1129 else
1130 {
1131 res = do_close(ch, priority, index);
1132 }
1133 }
1134 else if (res == core::no_such_name)
1135 {
1136 // do not report errors if channel does not exist
1137 res = core::ok;
1138 }
1139
1140 mtx_.unlock();
1141 }
1142
1143 return res;
1144 }
1145
1146 // synchronized by caller
do_close(channel * ch,std::size_t priority,std::size_t index)1147 core::result channel_group::do_close(
1148 channel * ch, std::size_t priority, std::size_t index)
1149 {
1150 bool close_me = false;
1151 core::result res = ch->post_close(priority, close_me);
1152
1153 if (res == core::ok && close_me)
1154 {
1155 // immediate close
1156
1157 res = do_hard_close(ch, index);
1158 }
1159
1160 return res;
1161 }
1162
1163 // synchronized by caller
do_hard_close(channel * ch,std::size_t index)1164 core::result channel_group::do_hard_close(channel * ch, std::size_t index)
1165 {
1166 core::result res = core::ok;
1167
1168 bool destroyed = channel_dec_ref(index, ch);
1169 if (destroyed == false)
1170 {
1171 // channel was not destroyed,
1172 // because there are more references to it
1173 // -> wake up the worker, so it can finalize the process
1174
1175 res = interrupt_work_waiter();
1176 }
1177
1178 return res;
1179 }
1180
add_listener(const char * target,core::new_incoming_connection_function connection_hook,void * connection_hook_hint,const char ** resolved_target)1181 core::result channel_group::add_listener(const char * target,
1182 core::new_incoming_connection_function connection_hook,
1183 void * connection_hook_hint,
1184 const char * * resolved_target)
1185 {
1186 core::result res;
1187 if (closing_)
1188 {
1189 res = core::bad_state;
1190 }
1191 else
1192 {
1193 listener * new_listener = static_cast<listener *>(
1194 alloc_->allocate(sizeof(listener)));
1195
1196 // preparation of the listener is potentially blocking
1197 // and is performed with mutex unlocked
1198
1199 if (new_listener != NULL)
1200 {
1201 new_listener->init(*alloc_, *this, mtx_,
1202 connection_hook, connection_hook_hint,
1203 io_error_callback_, io_error_callback_hint_);
1204
1205 res = new_listener->prepare(target);
1206 if (res == core::ok)
1207 {
1208 const char * res_target = new_listener->get_target();
1209
1210 if (resolved_target != NULL)
1211 {
1212 *resolved_target = res_target;
1213 }
1214
1215 // add new listener to the list
1216
1217 mtx_.lock();
1218
1219 new_listener->next = first_listener_;
1220 first_listener_ = new_listener;
1221
1222 mtx_.unlock();
1223
1224 if (event_notification_callback_ != NULL)
1225 {
1226 try
1227 {
1228 event_notification_callback_(
1229 event_notification_hint_,
1230 core::listener_added,
1231 res_target, 0);
1232 }
1233 catch (...)
1234 {
1235 // ignore errors from user callback
1236 }
1237 }
1238
1239 // notify the selector so that it can
1240 // take the new listener into account
1241
1242 res = selector_.interrupt();
1243 }
1244 else
1245 {
1246 alloc_->deallocate(new_listener);
1247 }
1248 }
1249 else
1250 {
1251 res = core::no_memory;
1252 }
1253 }
1254
1255 return res;
1256 }
1257
remove_listener(const char * target)1258 void channel_group::remove_listener(const char * target)
1259 {
1260 if (closing_ == false)
1261 {
1262 mtx_.lock();
1263
1264 listener * lst = first_listener_;
1265 bool found = false;
1266 while (lst != NULL && found == false)
1267 {
1268 if (std::strcmp(lst->get_target(), target) == 0)
1269 {
1270 found = true;
1271 }
1272 else
1273 {
1274 lst = lst->next;
1275 }
1276 }
1277
1278 if (found)
1279 {
1280 lst->dec_ref();
1281 prune_listeners();
1282 }
1283
1284 mtx_.unlock();
1285 }
1286 }
1287
do_some_work(std::size_t timeout,bool allow_outgoing_traffic,bool allow_incoming_traffic)1288 core::result channel_group::do_some_work(std::size_t timeout,
1289 bool allow_outgoing_traffic, bool allow_incoming_traffic)
1290 {
1291 core::result res;
1292 bool processed_buffered_data = false;
1293
1294 if (closing_)
1295 {
1296 res = core::bad_state;
1297 }
1298 else
1299 {
1300 // as a matter of optimization,
1301 // allow channels to flush their read buffers first
1302
1303 mtx_.lock();
1304
1305 res = core::ok;
1306 for (std::size_t i = 0;
1307 (res == core::ok) && (i != channels_num_); ++i)
1308 {
1309 channel * ch = channel_holders_[i].get_channel();
1310
1311 if ((ch != NULL) && ch->has_buffered_data())
1312 {
1313 ch->inc_ref();
1314
1315 while ((res == core::ok) && ch->has_buffered_data())
1316 {
1317 // if the channel has any buffered data, it should
1318 // not perform any I/O action that would result
1319 // in its request to self-close,
1320 // so the close_me flag is not used
1321
1322 bool dummy_close_me;
1323 res = ch->do_some_work(input, dummy_close_me);
1324 if (res == core::ok)
1325 {
1326 // if there was some buffered data,
1327 // do not attempt regular I/O after that,
1328 // users might rely on this invocation to
1329 // return cleanly without additional delays
1330
1331 processed_buffered_data = true;
1332 }
1333 }
1334
1335 ch->dec_ref();
1336 }
1337 }
1338
1339 mtx_.unlock();
1340 }
1341
1342 if ((res == core::ok) && (processed_buffered_data == false))
1343 {
1344 // note: listeners can change the set of channels
1345 // (they can add new channels to the collection),
1346 // and the work done by channels themselves can change the collection
1347 // as well (due to user activity in callbacks)
1348 // to avoid conflicts the list of channel pointers is first
1349 // copied to the shadow array and used for checking with the selector
1350 // so that the primary collection changes do not affect this process
1351
1352 // it is possible that any given channel or listener will have
1353 // its ref counter decremented in the meantime - this is checked
1354 // after the whole selector operation is finished
1355
1356 mtx_.lock();
1357
1358 // make sure the shadow channel array has proper size
1359 res = core::ok;
1360 if (shadow_channels_num_ != channels_num_)
1361 {
1362 if (shadow_channels_ != NULL)
1363 {
1364 alloc_->deallocate(shadow_channels_);
1365 }
1366
1367 shadow_channels_ = static_cast<channel * *>(
1368 alloc_->allocate(sizeof(channel *) * channels_num_));
1369 if (shadow_channels_ != NULL)
1370 {
1371 shadow_channels_num_ = channels_num_;
1372 }
1373 else
1374 {
1375 res = core::no_memory;
1376 }
1377 }
1378
1379 if (res == core::ok)
1380 {
1381 selector_.reset();
1382
1383 // freeze the shadow channel array and add channels to selector
1384 for (std::size_t i = 0;
1385 (res == core::ok) && (i != channels_num_); ++i)
1386 {
1387 channel * ch = channel_holders_[i].get_channel();
1388 if (ch != NULL)
1389 {
1390 ch->inc_ref();
1391
1392 res = selector_.add_channel(*ch,
1393 allow_outgoing_traffic, allow_incoming_traffic);
1394 }
1395
1396 shadow_channels_[i] = ch;
1397 }
1398
1399 // add listeners to selector
1400
1401 listener * lst = first_listener_;
1402 while ((res == core::ok) && (lst != NULL))
1403 {
1404 lst->inc_ref();
1405
1406 res = selector_.add_listener(*lst);
1407
1408 lst = lst->next;
1409 }
1410 }
1411
1412 // at this point all channels and listeners are "pinned"
1413 // with increased ref counters
1414
1415 #ifdef YAMI4_WITH_QNX
1416 if (res == core::ok)
1417 {
1418 // step 1. perform blocking send on all QNX channels
1419 for (std::size_t i = 0;
1420 (res == core::ok) && (i != shadow_channels_num_); ++i)
1421 {
1422 channel * ch = shadow_channels_[i];
1423
1424 if (ch != NULL)
1425 {
1426 bool close_me = false;
1427 res = ch->send_blocking_datagram(close_me);
1428
1429 if (close_me)
1430 {
1431 // see later comments in this function
1432 if (res == core::channel_closed)
1433 {
1434 res = core::ok;
1435 }
1436 else
1437 {
1438 // error condition
1439
1440 if (event_notification_callback_ != NULL)
1441 {
1442 try
1443 {
1444 event_notification_callback_(
1445 event_notification_hint_,
1446 core::connection_error,
1447 ch->get_target(), 0);
1448 }
1449 catch (...)
1450 {
1451 // ignore errors from user callback
1452 }
1453 }
1454 }
1455
1456 channel_dec_ref(i, ch);
1457 }
1458 }
1459 }
1460 }
1461 #endif // YAMI4_WITH_QNX
1462
1463 mtx_.unlock();
1464
1465 if (res == core::ok)
1466 {
1467 // step 2. wait for the work on selector
1468 // the main mutex is unlocked during the waiting phase
1469 // and waiting can be interrupted by other parts of the agent,
1470 // if there is any work to do like change in the list of channels
1471 // or listeners or a new outgoing frame in any channel
1472 // (this allows the worker thread to come back to step 1 above as well)
1473
1474 res = selector_.wait(timeout);
1475 }
1476
1477 mtx_.lock();
1478
1479 for (std::size_t i = 0;
1480 (res == core::ok) && (i != shadow_channels_num_); ++i)
1481 {
1482 channel * ch = shadow_channels_[i];
1483
1484 if (ch != NULL)
1485 {
1486 io_direction direction;
1487 if (selector_.is_channel_ready(*ch, direction))
1488 {
1489 bool close_me = false;
1490 res = ch->do_some_work(direction, close_me);
1491
1492 if (close_me)
1493 {
1494 // the channel should be closed due to error, EOF
1495 // or regular close request in the outgoing queue
1496
1497 if (res == core::channel_closed)
1498 {
1499 // the EOF condition is not an error
1500 // from the point of view of higher layers
1501 // (ie. from the point of view of what
1502 // do_some_work returns)
1503
1504 // note: the channel_closed value was
1505 // used for the callback above to propagate
1506 // proper reason to the user code
1507
1508 res = core::ok;
1509 }
1510 else
1511 {
1512 // error condition
1513
1514 if (event_notification_callback_ != NULL)
1515 {
1516 try
1517 {
1518 event_notification_callback_(
1519 event_notification_hint_,
1520 core::connection_error,
1521 ch->get_target(), 0);
1522 }
1523 catch (...)
1524 {
1525 // ignore errors from user callback
1526 }
1527 }
1528 }
1529
1530 channel_dec_ref(i, ch);
1531 }
1532 }
1533 }
1534 }
1535
1536 listener * lst = first_listener_;
1537 while ((res == core::ok) && (lst != NULL))
1538 {
1539 if (selector_.is_listener_ready(*lst))
1540 {
1541 lst->inc_ref();
1542
1543 res = lst->do_some_work();
1544
1545 lst->dec_ref();
1546 }
1547
1548 lst = lst->next;
1549 }
1550
1551 // scan the list of listeners and remove those that are
1552 // no longer referenced by any thread
1553
1554 prune_listeners();
1555
1556 // unref the channels from the shadow array
1557 for (std::size_t i = 0; i != shadow_channels_num_; ++i)
1558 {
1559 channel * ch = shadow_channels_[i];
1560 if (ch != NULL)
1561 {
1562 channel_dec_ref(i, ch);
1563 }
1564 }
1565
1566 mtx_.unlock();
1567 }
1568
1569 return res;
1570 }
1571
1572 // synchronized by caller
process_complete_incoming_frame(core::channel_descriptor cd,const char * buffer,const std::size_t buffer_size)1573 core::result channel_group::process_complete_incoming_frame(
1574 core::channel_descriptor cd,
1575 const char * buffer, const std::size_t buffer_size)
1576 {
1577 core::result res;
1578 if (closing_)
1579 {
1580 res = core::bad_state;
1581 }
1582 else
1583 {
1584 res = core::no_such_index;
1585
1586 std::size_t index;
1587 std::size_t sequence_number;
1588
1589 cd.get_details(index, sequence_number);
1590
1591 if (index < channels_num_)
1592 {
1593 channel_holder & ch_holder = channel_holders_[index];
1594 channel * ch = ch_holder.get_channel();
1595
1596 if (ch != NULL)
1597 {
1598 if (ch_holder.get_sequence_number() == sequence_number)
1599 {
1600 res = ch->process_complete_incoming_frame(
1601 buffer, buffer_size);
1602 }
1603 }
1604 }
1605 }
1606
1607 return res;
1608 }
1609
1610 #ifdef YAMI4_WITH_QNX
set_default_qnx_listening_channel_id(int chid)1611 void channel_group::set_default_qnx_listening_channel_id(int chid)
1612 {
1613 // only the first created QNX listener is remembered as default
1614 // for returning responses
1615
1616 if (qnx_listener_channel_id_ == 0)
1617 {
1618 qnx_listener_channel_id_ = chid;
1619 }
1620 }
1621 #endif // YAMI4_WITH_QNX
1622
interrupt_work_waiter()1623 core::result channel_group::interrupt_work_waiter()
1624 {
1625 const core::result res = selector_.interrupt();
1626 return res;
1627 }
1628
get_channel_usage(int & max_allowed,int & used)1629 void channel_group::get_channel_usage(int & max_allowed, int & used)
1630 {
1631 selector_.get_channel_usage(max_allowed, used);
1632 }
1633
get_pending_outgoing_bytes(core::channel_descriptor cd,std::size_t & bytes)1634 core::result channel_group::get_pending_outgoing_bytes(
1635 core::channel_descriptor cd, std::size_t & bytes)
1636 {
1637 core::result res;
1638 if (closing_)
1639 {
1640 res = core::bad_state;
1641 }
1642 else
1643 {
1644 res = core::no_such_index;
1645
1646 std::size_t index;
1647 std::size_t sequence_number;
1648
1649 cd.get_details(index, sequence_number);
1650
1651 // do not report errors if channel does not exist
1652
1653 mtx_.lock();
1654
1655 if (index < channels_num_)
1656 {
1657 channel_holder & ch_holder = channel_holders_[index];
1658 channel * ch = ch_holder.get_channel();
1659
1660 if (ch != NULL)
1661 {
1662 if (ch_holder.get_sequence_number() == sequence_number)
1663 {
1664 bytes = ch->get_pending_outgoing_bytes();
1665
1666 res = core::ok;
1667 }
1668 }
1669 }
1670
1671 mtx_.unlock();
1672 }
1673
1674 return res;
1675 }
1676
get_pending_outgoing_bytes(const char * target,std::size_t & bytes)1677 core::result channel_group::get_pending_outgoing_bytes(
1678 const char * target, std::size_t & bytes)
1679 {
1680 core::result res;
1681 if (closing_)
1682 {
1683 res = core::bad_state;
1684 }
1685 else
1686 {
1687 std::size_t index;
1688 std::size_t dummy_sequence_number;
1689 channel * ch;
1690
1691 mtx_.lock();
1692
1693 // try to find the existing channel with the same target
1694 res = find_existing_channel(target,
1695 index, dummy_sequence_number, ch);
1696
1697 if (res == core::ok)
1698 {
1699 bytes = ch->get_pending_outgoing_bytes();
1700 }
1701
1702 mtx_.unlock();
1703 }
1704
1705 return res;
1706 }
1707
1708 // synchronized by caller
find_existing_channel(const char * target,std::size_t & index,std::size_t & sequence_number,channel * & ch) const1709 core::result channel_group::find_existing_channel(const char * target,
1710 std::size_t & index, std::size_t & sequence_number, channel * & ch) const
1711 {
1712 core::result res = core::no_such_name;
1713
1714 for (std::size_t i = 0; i != channels_num_; ++i)
1715 {
1716 const channel_holder & ch_holder = channel_holders_[i];
1717 channel * chi = ch_holder.get_channel();
1718
1719 if (chi != NULL)
1720 {
1721 if (std::strcmp(chi->get_target(), target) == 0)
1722 {
1723 index = i;
1724 sequence_number = ch_holder.get_sequence_number();
1725 ch = chi;
1726 res = core::ok;
1727 break;
1728 }
1729 }
1730 }
1731
1732 return res;
1733 }
1734
1735 // synchronized by caller
find_unused_channel(std::size_t & index,bool reserve)1736 core::result channel_group::find_unused_channel(
1737 std::size_t & index, bool reserve)
1738 {
1739 bool found = false;
1740 for (std::size_t i = 0; found == false && i != channels_num_; ++i)
1741 {
1742 channel_holder & ch_holder = channel_holders_[i];
1743 const channel * ch = ch_holder.get_channel();
1744
1745 if (ch_holder.is_reserved() == false && ch == NULL)
1746 {
1747 index = i;
1748 found = true;
1749
1750 if (reserve)
1751 {
1752 ch_holder.reserve();
1753 }
1754 }
1755 }
1756
1757 core::result res;
1758 if (found)
1759 {
1760 res = core::ok;
1761 }
1762 else
1763 {
1764 const std::size_t initial_block_size = 10;
1765
1766 std::size_t new_channels_num;
1767 if (channels_num_ < initial_block_size)
1768 {
1769 new_channels_num = initial_block_size;
1770 }
1771 else
1772 {
1773 new_channels_num = 2 * channels_num_;
1774 }
1775
1776 channel_holder * new_channel_holders = static_cast<channel_holder *>(
1777 alloc_->allocate(sizeof(channel_holder) * new_channels_num));
1778 if (new_channel_holders != NULL)
1779 {
1780 std::memcpy(new_channel_holders, channel_holders_,
1781 sizeof(channel_holder) * channels_num_);
1782
1783 for (std::size_t i = channels_num_; i != new_channels_num; ++i)
1784 {
1785 channel_holder * ch_holder = new_channel_holders + i;
1786 ch_holder->init();
1787 }
1788
1789 // pick the first slot after those that already existed
1790 index = channels_num_;
1791 if (reserve)
1792 {
1793 new_channel_holders[index].reserve();
1794 }
1795
1796 if (channel_holders_ != NULL)
1797 {
1798 alloc_->deallocate(channel_holders_);
1799 }
1800
1801 channel_holders_ = new_channel_holders;
1802 channels_num_ = new_channels_num;
1803
1804 res = core::ok;
1805 }
1806 else
1807 {
1808 res = core::no_memory;
1809 }
1810 }
1811
1812 return res;
1813 }
1814
1815 // synchronized by caller
channel_dec_ref(std::size_t index,channel * ch)1816 bool channel_group::channel_dec_ref(std::size_t index, channel * ch)
1817 {
1818 bool destroyed = false;
1819
1820 ch->dec_ref();
1821 if (ch->can_be_removed())
1822 {
1823 channel_holders_[index].clean();
1824 const char * target = ch->move_target();
1825
1826 // the user callback can be invoked from this,
1827 // but the state of all structures is safe at this point
1828
1829 ch->clean();
1830 alloc_->deallocate(ch);
1831
1832 if (event_notification_callback_ != NULL)
1833 {
1834 try
1835 {
1836 event_notification_callback_(
1837 event_notification_hint_,
1838 core::connection_closed,
1839 target, 0);
1840 }
1841 catch (...)
1842 {
1843 // ignore errors from user callback
1844 }
1845 }
1846
1847 if (disconnection_hook_ != NULL)
1848 {
1849 // user callback is performed without the lock
1850 mtx_.unlock();
1851
1852 try
1853 {
1854 disconnection_hook_(disconnection_hook_hint_,
1855 target, core::channel_closed);
1856 }
1857 catch (...)
1858 {
1859 // ignore errors from user callback
1860 }
1861
1862 mtx_.lock();
1863 }
1864
1865 alloc_->deallocate(target);
1866
1867 destroyed = true;
1868 }
1869
1870 return destroyed;
1871 }
1872
1873 // synchronized by caller
prune_listeners()1874 void channel_group::prune_listeners()
1875 {
1876 listener * * removal_point = &first_listener_;
1877 while (*removal_point != NULL)
1878 {
1879 listener * lst = *removal_point;
1880 if (lst->can_be_removed())
1881 {
1882 if (event_notification_callback_ != NULL)
1883 {
1884 try
1885 {
1886 event_notification_callback_(
1887 event_notification_hint_,
1888 core::listener_removed,
1889 lst->get_target(), 0);
1890 }
1891 catch (...)
1892 {
1893 // ignore errors from user callback
1894 }
1895 }
1896
1897 *removal_point = lst->next;
1898 lst->clean();
1899 alloc_->deallocate(lst);
1900 }
1901 else
1902 {
1903 removal_point = &(lst->next);
1904 }
1905 }
1906 }
1907
1908 // synchronized by caller
generate_message_id()1909 std::size_t channel_group::generate_message_id()
1910 {
1911 return ++last_message_id_;
1912 }
1913