1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4.  If not, see <http://www.gnu.org/licenses/>.
16 
17 #include "channel_group.h"
18 #include "allocator.h"
19 #include "channel.h"
20 #include "channel_holder.h"
21 #include "listener.h"
22 #include "options.h"
23 #include "outgoing_frame.h"
24 #include "parameters.h"
25 #include "serialization.h"
26 #include <cstring>
27 
28 using namespace yami;
29 using namespace details;
30 
31 namespace // unnamed namespace
32 {
33 
34 const std::size_t frame_header_size = 16; // four words
35 
36 // helper for serialization, deallocates (deeply) the array of frames
deallocate_array_of_frames(allocator & alloc,char ** buffers,std::size_t * buffer_sizes,std::size_t number_of_frames)37 void deallocate_array_of_frames(allocator & alloc,
38     char * * buffers, std::size_t * buffer_sizes,
39     std::size_t number_of_frames)
40 {
41     if (buffers != NULL)
42     {
43         for (std::size_t i = 0; i != number_of_frames; ++i)
44         {
45             if (buffers[i] != NULL)
46             {
47                 alloc.deallocate(buffers[i]);
48             }
49         }
50 
51         alloc.deallocate(buffers);
52     }
53 
54     if (buffer_sizes != NULL)
55     {
56         alloc.deallocate(buffer_sizes);
57     }
58 }
59 
60 // helper for serialization, prepares the array of outgoing frames
61 // with properly allocated data segments
prepare_array_of_frames(allocator & alloc,std::size_t message_id,std::size_t header_total_buffer_size,std::size_t body_total_buffer_size,std::size_t preferred_frame_size,std::size_t & index_of_border_frame,std::size_t & body_offset_in_border_frame,std::size_t & size_of_last_frame,std::size_t & number_of_frames,char ** & buffers,std::size_t * & buffer_sizes)62 core::result prepare_array_of_frames(allocator & alloc,
63     std::size_t message_id,
64     std::size_t header_total_buffer_size,
65     std::size_t body_total_buffer_size,
66     std::size_t preferred_frame_size,
67     std::size_t & index_of_border_frame,
68     std::size_t & body_offset_in_border_frame,
69     std::size_t & size_of_last_frame,
70     std::size_t & number_of_frames,
71     char * * & buffers, std::size_t * & buffer_sizes)
72 {
73     core::result res;
74 
75     const std::size_t frame_payload_size =
76         preferred_frame_size - frame_header_size;
77 
78     const std::size_t total_buffer_size =
79         header_total_buffer_size + body_total_buffer_size;
80 
81     // this is the index of frame where the first byte of body data is located
82     index_of_border_frame = header_total_buffer_size / frame_payload_size;
83 
84     body_offset_in_border_frame =
85         header_total_buffer_size % frame_payload_size + frame_header_size;
86 
87     number_of_frames =
88         (total_buffer_size + frame_payload_size - 1) / frame_payload_size;
89 
90     size_of_last_frame =
91         (total_buffer_size - 1) % frame_payload_size + 1 + frame_header_size;
92 
93     // allocate the array of buffers
94 
95     buffers = static_cast<char * *>(
96         alloc.allocate(number_of_frames * sizeof(char *)));
97     if (buffers != NULL)
98     {
99         // clear the pointers
100         for (std::size_t i = 0; i != number_of_frames; ++i)
101         {
102             buffers[i] = NULL;
103         }
104     }
105 
106     buffer_sizes = static_cast<std::size_t *>(
107         alloc.allocate(number_of_frames * sizeof(std::size_t)));
108     if (buffers != NULL && buffer_sizes != NULL)
109     {
110 
111         // allocate the data segment for each outgoing frame
112 
113         res = core::ok;
114         for (std::size_t i = 0; res == core::ok && i != number_of_frames; ++i)
115         {
116             std::size_t size_of_this_frame;
117 
118             if (i != number_of_frames - 1)
119             {
120                 // not the last frame, ask for full preferred size
121 
122                 size_of_this_frame = preferred_frame_size;
123             }
124             else
125             {
126                 // last frame, possibly shorter
127 
128                 size_of_this_frame = size_of_last_frame;
129             }
130 
131             buffer_sizes[i] = size_of_this_frame;
132             buffers[i] = static_cast<char *>(
133                 alloc.allocate(size_of_this_frame));
134             if (buffers[i] != NULL)
135             {
136                 // fill the frame header with proper information
137 
138                 // frame number start with 1
139                 int frame_number = i + 1;
140 
141                 if (i == number_of_frames - 1)
142                 {
143                     // this is the last frame in the whole message
144                     // use negative frame number to mark this
145 
146                     frame_number *= -1;
147                 }
148 
149                 fill_outgoing_frame_header(buffers[i],
150                     message_id,
151                     frame_number,
152                     header_total_buffer_size,
153                     size_of_this_frame - frame_header_size); // frame payload
154             }
155             else
156             {
157                 // cannot allocate data segment
158 
159                 res = core::no_memory;
160             }
161         }
162     }
163     else
164     {
165         res = core::no_memory;
166     }
167 
168     return res;
169 }
170 
171 // helper for preparing array of buffer pointers and buffer sizes
172 // that will be used for serialization
173 // they are offset to take into account space for frame headers
prepare_array_of_serialize_buffer_pointers(allocator & alloc,char ** buffers,std::size_t * buffer_sizes,std::size_t number_of_frames,char ** & serialize_buffers,std::size_t * & serialize_buffer_sizes)174 core::result prepare_array_of_serialize_buffer_pointers(
175     allocator & alloc,
176     char * * buffers, std::size_t * buffer_sizes,
177     std::size_t number_of_frames,
178     char * * & serialize_buffers, std::size_t * & serialize_buffer_sizes)
179 {
180     core::result res = core::ok;
181 
182     serialize_buffers = static_cast<char * *>(
183         alloc.allocate(number_of_frames * sizeof(char *)));
184     if (serialize_buffers != NULL)
185     {
186         for (std::size_t i = 0; i != number_of_frames; ++i)
187         {
188             // the serialization buffer starts right after the frame header
189             serialize_buffers[i] = buffers[i] + frame_header_size;
190         }
191     }
192     else
193     {
194         res = core::no_memory;
195     }
196 
197     if (res == core::ok)
198     {
199         serialize_buffer_sizes = static_cast<std::size_t *>(
200             alloc.allocate(number_of_frames * sizeof(std::size_t)));
201         if (serialize_buffer_sizes != NULL)
202         {
203             for (std::size_t i = 0; i != number_of_frames; ++i)
204             {
205                 serialize_buffer_sizes[i] =
206                     buffer_sizes[i] - frame_header_size;
207             }
208         }
209         else
210         {
211             res = core::no_memory;
212         }
213     }
214 
215     return res;
216 }
217 
218 // helper function for serializing the new message and injecting the
219 // output frames into the output queue of the given channel
serialize_and_post(allocator & alloc,channel & ch,std::size_t message_id,std::size_t priority,const core::serializable & message_header,const core::serializable & message_body,bool & first_frame,core::message_progress_function progress_callback,void * progress_hint)220 core::result serialize_and_post(allocator & alloc,
221     channel & ch, std::size_t message_id, std::size_t priority,
222     const core::serializable & message_header,
223     const core::serializable & message_body,
224     bool & first_frame,
225     core::message_progress_function progress_callback,
226     void * progress_hint)
227 {
228     // note:
229     // There are two arrays in use:
230     // 1. the array of pointers to buffers that will be used as complete
231     //    frames and will be injected to the outgoing queue
232     //    of the given channel - this array owns the buffers
233     // 2. the array of pointers to buffers that are used for serialization
234     //    these pointers are offset with regard to those in array 1. to
235     //    take into account the necessary space for frame headers
236     //    this array does not own the actual buffers
237 
238     core::result res;
239 
240     const std::size_t preferred_frame_size = ch.get_frame_size();
241 
242     std::size_t header_total_buffer_size;
243     std::size_t body_total_buffer_size;
244 
245     res = message_header.get_serialize_buffer_size(header_total_buffer_size);
246     if (res == core::ok)
247     {
248         res = message_body.get_serialize_buffer_size(body_total_buffer_size);
249     }
250 
251     // prepare the array of buffers for all outgoing frames
252 
253     std::size_t number_of_frames;
254     std::size_t index_of_border_frame;
255     std::size_t body_offset_in_border_frame;
256     std::size_t size_of_last_frame;
257     char * * buffers = NULL;
258     std::size_t * buffer_sizes = NULL;
259     if (res == core::ok)
260     {
261         res = prepare_array_of_frames(alloc, message_id,
262             header_total_buffer_size, body_total_buffer_size,
263             preferred_frame_size,
264             index_of_border_frame, body_offset_in_border_frame,
265             size_of_last_frame, number_of_frames,
266             buffers, buffer_sizes);
267     }
268 
269     // prepare the array of pointers for serialization
270 
271     char * * serialize_buffers = NULL;
272     std::size_t * serialize_buffer_sizes = NULL;
273     if (res == core::ok)
274     {
275         res = prepare_array_of_serialize_buffer_pointers(alloc,
276             buffers, buffer_sizes, number_of_frames,
277             serialize_buffers, serialize_buffer_sizes);
278     }
279 
280     // serialize the message header
281     if (res == core::ok)
282     {
283         // note: it is allowed to provide more buffers than are actually
284         // needed for serialization
285 
286         res = message_header.serialize(
287             serialize_buffers, serialize_buffer_sizes, number_of_frames);
288     }
289 
290     // serialize the message body
291     if (res == core::ok)
292     {
293         // the buffer pointer that will be used as the first one
294         // for serializing the message body needs to be ammended
295         // in order to take into account the tail of message header data
296 
297         serialize_buffers[index_of_border_frame] =
298             buffers[index_of_border_frame] + body_offset_in_border_frame;
299         serialize_buffer_sizes[index_of_border_frame] =
300             buffer_sizes[index_of_border_frame] - body_offset_in_border_frame;
301 
302         res = message_body.serialize(
303             serialize_buffers + index_of_border_frame,
304             serialize_buffer_sizes + index_of_border_frame,
305             number_of_frames - index_of_border_frame);
306     }
307 
308     // post new outgoing frames to the given channel
309 
310     if (res == core::ok)
311     {
312         res = ch.post(priority,
313             buffers, buffer_sizes, number_of_frames,
314             first_frame,
315             progress_callback, progress_hint);
316     }
317 
318     // clean all helper buffers
319     if (serialize_buffers != NULL)
320     {
321         alloc.deallocate(serialize_buffers);
322     }
323 
324     if (serialize_buffer_sizes != NULL)
325     {
326         alloc.deallocate(serialize_buffer_sizes);
327     }
328 
329     if (res == core::ok)
330     {
331         // actual buffers were transferred to the channel,
332         // do not deallocate them
333         alloc.deallocate(buffers);
334         alloc.deallocate(buffer_sizes);
335     }
336     else
337     {
338         // buffers were not transferred or there was some other error
339         // -> deallocate deeply the whole structure
340         deallocate_array_of_frames(alloc,
341             buffers, buffer_sizes, number_of_frames);
342     }
343 
344     return res;
345 }
346 
347 } // unnamed namespace
348 
init(allocator & alloc,const core::parameters * configuration_options,core::incoming_message_dispatch_function dispatch_callback,void * dispatch_hint,core::closed_connection_function disconnection_hook,void * disconnection_hook_hint)349 core::result channel_group::init(allocator & alloc,
350     const core::parameters * configuration_options,
351     core::incoming_message_dispatch_function dispatch_callback,
352     void * dispatch_hint,
353     core::closed_connection_function disconnection_hook,
354     void * disconnection_hook_hint)
355 {
356     channel_holders_ = NULL;
357     channels_num_ = 0;
358     shadow_channels_ = NULL;
359     shadow_channels_num_ = 0;
360     first_listener_ = NULL;
361 #ifdef YAMI4_WITH_QNX
362     qnx_listener_channel_id_ = 0;
363 #endif // YAMI4_WITH_QNX
364     alloc_ = &alloc;
365     closing_ = false;
366     last_message_id_ = 0;
367     incoming_message_callback_ = dispatch_callback;
368     incoming_message_hint_ = dispatch_hint;
369     disconnection_hook_ = disconnection_hook;
370     disconnection_hook_hint_ = disconnection_hook_hint;
371     event_notification_callback_ = NULL;
372     io_error_callback_ = NULL;
373 
374     configuration_options_.init(configuration_options);
375     mtx_.init();
376 
377     core::result res = selector_.init(alloc);
378 
379 #ifdef YAMI4_WITH_OPEN_SSL
380     ssl_ctx_ = NULL;
381     if (res == core::ok)
382     {
383         // the SSL context is always created;
384         // but the incoming SSL connections will be possible
385         // only with proper certificate
386 
387         (void)SSL_library_init();
388 
389         ssl_ctx_ = SSL_CTX_new(SSLv23_method());
390         if (ssl_ctx_ != NULL)
391         {
392             SSL_CTX_set_verify(ssl_ctx_, SSL_VERIFY_NONE, NULL);
393 
394             if (configuration_options_.ssl_certificate_file[0] != '\0' &&
395                 configuration_options_.ssl_private_key_file[0] != '\0')
396             {
397                 int ssl_ret1 = SSL_CTX_use_certificate_file(ssl_ctx_,
398                     configuration_options_.ssl_certificate_file,
399                     SSL_FILETYPE_PEM);
400                 int ssl_ret2 = SSL_CTX_use_PrivateKey_file(ssl_ctx_,
401                     configuration_options_.ssl_private_key_file,
402                     SSL_FILETYPE_PEM);
403 
404                 if (ssl_ret1 != 1 || ssl_ret2 != 1)
405                 {
406                     SSL_CTX_free(ssl_ctx_);
407                     ssl_ctx_ = NULL;
408 
409                     res = core::unexpected_value;
410                 }
411             }
412         }
413         else
414         {
415             res = core::not_enough_space;
416         }
417     }
418 #endif // YAMI4_WITH_OPEN_SSL
419 
420     return res;
421 }
422 
install_event_notifications(core::event_notification_function event_notification_callback,void * event_notification_hint)423 void channel_group::install_event_notifications(
424     core::event_notification_function event_notification_callback,
425     void * event_notification_hint)
426 {
427     event_notification_callback_ = event_notification_callback;
428     event_notification_hint_ = event_notification_hint;
429 }
430 
install_io_error_logger(core::io_error_function io_error_callback,void * io_error_callback_hint)431 void channel_group::install_io_error_logger(
432     core::io_error_function io_error_callback,
433     void * io_error_callback_hint)
434 {
435     io_error_callback_ = io_error_callback;
436     io_error_callback_hint_ = io_error_callback_hint;
437 
438     selector_.install_io_error_logger(
439         io_error_callback, io_error_callback_hint);
440 }
441 
clean(bool uses_private_area)442 void channel_group::clean(bool uses_private_area)
443 {
444     // note: if uses_private_area is false, then the memory needs to be
445     // deallocated, because it was acquired from the global free store
446     // otherwise (uses_private_area == true) all structures were
447     // created in the private storage and need not be deallocated
448 
449     // in any case, all files and sockets need to be physically closed
450 
451     closing_ = true;
452 
453     mtx_.lock();
454 
455     // clean the channels
456     for (std::size_t i = 0; i != channels_num_; ++i)
457     {
458         channel_holder & ch_holder = channel_holders_[i];
459 
460         channel * ch = ch_holder.get_channel();
461         if (ch != NULL)
462         {
463             const char * target = ch->move_target();
464 
465             if (uses_private_area)
466             {
467                 // it is enough to close the physical connection
468                 ch->close_connection();
469             }
470             else
471             {
472                 // force full cleanup of the channel
473                 ch->clean();
474                 alloc_->deallocate(ch);
475             }
476 
477             if (event_notification_callback_ != NULL)
478             {
479                 try
480                 {
481                     event_notification_callback_(
482                         event_notification_hint_,
483                         core::connection_closed,
484                         target, 0);
485                 }
486                 catch (...)
487                 {
488                     // ignore errors from user callback
489                 }
490             }
491 
492             if (disconnection_hook_ != NULL)
493             {
494                 mtx_.unlock();
495 
496                 try
497                 {
498                     disconnection_hook_(disconnection_hook_hint_,
499                         target, core::channel_closed);
500                 }
501                 catch (...)
502                 {
503                     // ignore errors from user callback
504                 }
505 
506                 mtx_.lock();
507             }
508 
509             if (uses_private_area == false)
510             {
511                 alloc_->deallocate(target);
512             }
513         }
514     }
515 
516     if (uses_private_area == false)
517     {
518         alloc_->deallocate(channel_holders_);
519         alloc_->deallocate(shadow_channels_);
520     }
521 
522     // clean the list of listeners
523     while (first_listener_ != NULL)
524     {
525         listener * next = first_listener_->next;
526 
527         if (event_notification_callback_ != NULL)
528         {
529             try
530             {
531                 event_notification_callback_(
532                     event_notification_hint_,
533                     core::listener_removed,
534                     first_listener_->get_target(), 0);
535             }
536             catch (...)
537             {
538                 // ignore errors from user callback
539             }
540         }
541 
542         if (uses_private_area)
543         {
544             // it is enough to close the listening socket
545             first_listener_->close_resource();
546         }
547         else
548         {
549             // force full cleanup
550             first_listener_->clean();
551             alloc_->deallocate(first_listener_);
552         }
553 
554         first_listener_ = next;
555     }
556 
557     mtx_.unlock();
558 
559     // clean the selector (it has internal pipe that needs to be closed)
560 
561     selector_.clean();
562     mtx_.clean();
563 
564     if (event_notification_callback_ != NULL)
565     {
566         try
567         {
568             event_notification_callback_(
569                 event_notification_hint_,
570                 core::agent_closed,
571                 NULL, 0);
572         }
573         catch (...)
574         {
575             // ignore errors from user callback
576         }
577     }
578 
579 #ifdef YAMI4_WITH_OPEN_SSL
580     if (ssl_ctx_ != NULL)
581     {
582         SSL_CTX_free(ssl_ctx_);
583         ssl_ctx_ = NULL;
584     }
585 #endif // YAMI4_WITH_OPEN_SSL
586 }
587 
open(const char * target,core::channel_descriptor & new_channel,bool & created_new_channel,bool do_lock,const core::parameters * overriding_options)588 core::result channel_group::open(const char * target,
589     core::channel_descriptor & new_channel,
590     bool & created_new_channel, bool do_lock,
591     const core::parameters * overriding_options)
592 {
593     // note:
594     // This function can be called by multiple threads.
595     // In order to allow for higher concurrency it can lead to
596     // multiple connections being created for the same target,
597     // although only one such connection is ultimately kept open.
598 
599     core::result res;
600     if (closing_)
601     {
602         res = core::bad_state;
603     }
604     else
605     {
606         std::size_t index;
607         std::size_t sequence_number;
608         channel * existing_channel;
609 
610         if (do_lock)
611         {
612             mtx_.lock();
613         }
614 
615         // try to find the existing channel with the same target
616         res = find_existing_channel(
617             target, index, sequence_number, existing_channel);
618 
619         if (do_lock)
620         {
621             mtx_.unlock();
622         }
623 
624         if (res == core::ok)
625         {
626             new_channel = core::channel_descriptor(index, sequence_number);
627             created_new_channel = false;
628         }
629         else
630         {
631             // no channel with this target exists,
632             // initialize a new channel in some empty slot
633 
634             channel * ch = static_cast<channel *>(
635                 alloc_->allocate(sizeof(channel)));
636 
637             if (ch != NULL)
638             {
639                 // initialize channel outside of the critical section
640                 // - this can lead to multiple channels being open
641 
642                 res = ch->init(*alloc_, mtx_,
643                     configuration_options_, overriding_options,
644                     target,
645                     incoming_message_callback_, incoming_message_hint_,
646                     event_notification_callback_, event_notification_hint_,
647                     io_error_callback_, io_error_callback_hint_);
648 
649 #ifdef YAMI4_WITH_QNX
650                 if (ch->get_protocol() == proto_qnx)
651                 {
652                     ch->set_default_qnx_listening_channel_id(
653                         qnx_listener_channel_id_);
654 
655                     ch->set_selector(selector_);
656                 }
657 #endif // YAMI4_WITH_QNX
658 
659 #ifdef YAMI4_WITH_OPEN_SSL
660                 if (res == core::ok)
661                 {
662                     protocol prot = ch->get_protocol();
663                     if (prot == proto_tcps)
664                     {
665                         SSL * ssl = SSL_new(ssl_ctx_);
666                         if (ssl != NULL)
667                         {
668                             io_descriptor_type fd = ch->get_io_descriptor();
669 
670                             int ssl_ret = SSL_set_fd(ssl, fd);
671                             if (ssl_ret == 1)
672                             {
673                                 ch->set_client_ssl(ssl);
674                             }
675                             else
676                             {
677                                 SSL_free(ssl);
678 
679                                 ch->clean();
680 
681                                 res = core::io_error;
682                             }
683                         }
684                         else
685                         {
686                             ch->clean();
687 
688                             res = core::not_enough_space;
689                         }
690                     }
691                 }
692 #endif // YAMI4_WITH_OPEN_SSL
693 
694                 if (res == core::ok)
695                 {
696                     bool channel_added = false;
697 
698                     if (do_lock)
699                     {
700                         mtx_.lock();
701                     }
702 
703                     // re-check to verify that the channel
704                     // is not yet in the set
705                     res = find_existing_channel(
706                         target, index, sequence_number, existing_channel);
707 
708                     if (res == core::ok)
709                     {
710                         // the channel with this name is in the set
711                         // (probably just added by another thread)
712 
713                         // return the descriptor for that existing channel
714 
715                         new_channel = core::channel_descriptor(
716                             index, sequence_number);
717                         created_new_channel = false;
718                     }
719                     else
720                     {
721                         res = find_unused_channel(index, true);
722                         if (res == core::ok)
723                         {
724                             channel_holders_[index].set(ch, sequence_number);
725                             new_channel = core::channel_descriptor(
726                                 index, sequence_number);
727 
728                             created_new_channel = true;
729                             channel_added = true;
730 
731                             if (event_notification_callback_ != NULL)
732                             {
733                                 try
734                                 {
735                                     event_notification_callback_(
736                                         event_notification_hint_,
737                                         core::outgoing_connection_open,
738                                         target, 0);
739                                 }
740                                 catch (...)
741                                 {
742                                     // ignore errors from user callback
743                                 }
744                             }
745                         }
746                     }
747 
748                     if (do_lock)
749                     {
750                         mtx_.unlock();
751                     }
752 
753                     if (channel_added)
754                     {
755                         // notify the selector so that it can
756                         // take the new channel into account
757 
758                         res = selector_.interrupt();
759                     }
760                     else
761                     {
762                         // channel was initialized,
763                         // but not included in the set
764 
765                         ch->clean();
766                         alloc_->deallocate(ch);
767                     }
768                 }
769                 else
770                 {
771                     // channel was allocated but not initialized properly
772 
773                     alloc_->deallocate(ch);
774                 }
775             }
776             else
777             {
778                 res = core::no_memory;
779             }
780         }
781     }
782 
783     return res;
784 }
785 
is_open(const char * target,core::channel_descriptor & existing_channel) const786 core::result channel_group::is_open(const char * target,
787     core::channel_descriptor & existing_channel) const
788 {
789     core::result res;
790     if (closing_)
791     {
792         res = core::bad_state;
793     }
794     else
795     {
796         std::size_t index;
797         std::size_t sequence_number;
798         channel * dummy_ch;
799 
800         mtx_.lock();
801 
802         res = find_existing_channel(
803             target, index, sequence_number, dummy_ch);
804 
805         if (res == core::ok)
806         {
807             existing_channel =
808                 core::channel_descriptor(index, sequence_number);
809         }
810 
811         mtx_.unlock();
812     }
813 
814     return res;
815 }
816 
817 // synchronized by caller
add_existing(char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::channel_descriptor & new_descriptor,channel * & new_channel)818 core::result channel_group::add_existing(
819     char * target, io_descriptor_type fd, protocol prot,
820     std::size_t preferred_frame_size,
821     core::channel_descriptor & new_descriptor,
822     channel * & new_channel)
823 {
824     core::result res;
825     if (closing_)
826     {
827         res = core::bad_state;
828     }
829     else
830     {
831         // make sure that no channel for this target already exists
832 
833         std::size_t index;
834         std::size_t sequence_number;
835         channel * dummy_ch;
836 
837         // try to find the existing channel with the same target
838         // (it should not exist)
839         res = find_existing_channel(target,
840             index, sequence_number, dummy_ch);
841 
842         if (res == core::ok)
843         {
844             // reject the new channel, there is some I/O hazard
845             // (it is not normal for the listener to accept a channel
846             // for the target that already exists)
847             res = core::io_error;
848         }
849         else
850         {
851             // initialize a new channel in some empty slot
852 
853             res = find_unused_channel(index, false);
854             if (res == core::ok)
855             {
856                 channel * ch = static_cast<channel *>(
857                     alloc_->allocate(sizeof(channel)));
858                 if (ch != NULL)
859                 {
860                     ch->init(*alloc_, mtx_, configuration_options_,
861                         target, fd, prot, preferred_frame_size,
862                         incoming_message_callback_, incoming_message_hint_,
863                         event_notification_callback_,
864                         event_notification_hint_,
865                         io_error_callback_, io_error_callback_hint_);
866 
867 #ifdef YAMI4_WITH_QNX
868                     if (ch->get_protocol() == proto_qnx)
869                     {
870                         ch->set_default_qnx_listening_channel_id(
871                             qnx_listener_channel_id_);
872 
873                         ch->set_selector(selector_);
874                     }
875 #endif // YAMI4_WITH_QNX
876 
877                     channel_holders_[index].set(ch, sequence_number);
878 
879                     new_descriptor =
880                         core::channel_descriptor(index, sequence_number);
881 
882                     new_channel = ch;
883 
884                     if (event_notification_callback_ != NULL)
885                     {
886                         try
887                         {
888                             event_notification_callback_(
889                                 event_notification_hint_,
890                                 core::incoming_connection_open,
891                                 target, 0);
892                         }
893                         catch (...)
894                         {
895                             // ignore errors from user callback
896                         }
897                     }
898                 }
899                 else
900                 {
901                     res = core::no_memory;
902                 }
903             }
904         }
905     }
906 
907     return res;
908 }
909 
910 #ifdef YAMI4_WITH_OPEN_SSL
911 // synchronized by caller
add_existing_ssl(char * target,io_descriptor_type fd,protocol prot,std::size_t preferred_frame_size,core::channel_descriptor & new_descriptor)912 core::result channel_group::add_existing_ssl(
913     char * target, io_descriptor_type fd, protocol prot,
914     std::size_t preferred_frame_size,
915     core::channel_descriptor & new_descriptor)
916 {
917     core::result res;
918 
919     SSL * ssl = SSL_new(ssl_ctx_);
920     if (ssl != NULL)
921     {
922         int ssl_ret = SSL_set_fd(ssl, fd);
923         if (ssl_ret == 1)
924         {
925             channel * new_channel;
926             res = add_existing(target,
927                 fd, prot, preferred_frame_size, new_descriptor, new_channel);
928             if (res == core::ok)
929             {
930                 new_channel->set_server_ssl(ssl);
931             }
932         }
933         else
934         {
935             SSL_free(ssl);
936 
937             res = core::io_error;
938         }
939     }
940     else
941     {
942         res = core::not_enough_space;
943     }
944 
945     return res;
946 }
947 #endif // YAMI4_WITH_OPEN_SSL
948 
post(core::channel_descriptor cd,const core::serializable & message_header,const core::serializable & message_body,std::size_t priority,core::message_progress_function progress_callback,void * progress_hint)949 core::result channel_group::post(
950     core::channel_descriptor cd,
951     const core::serializable & message_header,
952     const core::serializable & message_body,
953     std::size_t priority,
954     core::message_progress_function progress_callback,
955     void * progress_hint)
956 {
957     core::result res;
958     bool first_frame;
959     if (closing_)
960     {
961         res = core::bad_state;
962     }
963     else
964     {
965         res = core::no_such_index;
966 
967         std::size_t index;
968         std::size_t sequence_number;
969 
970         cd.get_details(index, sequence_number);
971 
972         mtx_.lock();
973 
974         if (index < channels_num_)
975         {
976             channel_holder & ch_holder = channel_holders_[index];
977             channel * ch = ch_holder.get_channel();
978 
979             if (ch != NULL)
980             {
981                 if (ch_holder.get_sequence_number() == sequence_number)
982                 {
983                     const std::size_t message_id = generate_message_id();
984 
985                     res = serialize_and_post(*alloc_,
986                         *ch, message_id, priority,
987                         message_header, message_body,
988                         first_frame,
989                         progress_callback, progress_hint);
990                 }
991             }
992         }
993 
994         mtx_.unlock();
995     }
996 
997     if (res == core::ok && first_frame)
998     {
999         // interrupt selector so that it can perform an output operation
1000 
1001         res = selector_.interrupt();
1002     }
1003 
1004     return res;
1005 }
1006 
post(const char * target,const core::serializable & message_header,const core::serializable & message_body,std::size_t priority,core::message_progress_function progress_callback,void * progress_hint)1007 core::result channel_group::post(
1008     const char * target,
1009     const core::serializable & message_header,
1010     const core::serializable & message_body,
1011     std::size_t priority,
1012     core::message_progress_function progress_callback,
1013     void * progress_hint)
1014 {
1015     core::result res;
1016     bool first_frame;
1017     if (closing_)
1018     {
1019         res = core::bad_state;
1020     }
1021     else
1022     {
1023         std::size_t index;
1024         std::size_t dummy_sequence_number;
1025         channel * ch;
1026 
1027         mtx_.lock();
1028 
1029         // try to find the existing channel with the same target
1030         res = find_existing_channel(
1031             target, index, dummy_sequence_number, ch);
1032 
1033         if (res == core::ok)
1034         {
1035             const std::size_t message_id = generate_message_id();
1036 
1037             res = serialize_and_post(*alloc_,
1038                 *ch, message_id, priority, message_header, message_body,
1039                 first_frame,
1040                 progress_callback, progress_hint);
1041         }
1042 
1043         mtx_.unlock();
1044     }
1045 
1046     if (res == core::ok && first_frame)
1047     {
1048         // interrupt selector so that it can perform an output operation
1049 
1050         res = selector_.interrupt();
1051     }
1052 
1053     return res;
1054 }
1055 
close(core::channel_descriptor cd,bool hard_close,std::size_t priority)1056 core::result channel_group::close(
1057     core::channel_descriptor cd, bool hard_close, std::size_t priority)
1058 {
1059     core::result res;
1060     if (closing_)
1061     {
1062         res = core::bad_state;
1063     }
1064     else
1065     {
1066         std::size_t index;
1067         std::size_t sequence_number;
1068 
1069         cd.get_details(index, sequence_number);
1070 
1071         // do not report errors if channel does not exist
1072 
1073         mtx_.lock();
1074 
1075         res = core::ok;
1076         if (index < channels_num_)
1077         {
1078             channel_holder & ch_holder = channel_holders_[index];
1079             channel * ch = ch_holder.get_channel();
1080 
1081             if (ch != NULL)
1082             {
1083                 if (ch_holder.get_sequence_number() == sequence_number)
1084                 {
1085                     if (hard_close)
1086                     {
1087                         res = do_hard_close(ch, index);
1088                     }
1089                     else
1090                     {
1091                         res = do_close(ch, priority, index);
1092                     }
1093                 }
1094             }
1095         }
1096 
1097         mtx_.unlock();
1098     }
1099 
1100     return res;
1101 }
1102 
close(const char * target,bool hard_close,std::size_t priority)1103 core::result channel_group::close(const char * target,
1104     bool hard_close, std::size_t priority)
1105 {
1106     core::result res;
1107     if (closing_)
1108     {
1109         res = core::bad_state;
1110     }
1111     else
1112     {
1113         std::size_t index;
1114         std::size_t dummy_sequence_number;
1115         channel * ch;
1116 
1117         mtx_.lock();
1118 
1119         // try to find the existing channel with the same target
1120         res = find_existing_channel(target,
1121             index, dummy_sequence_number, ch);
1122 
1123         if (res == core::ok)
1124         {
1125             if (hard_close)
1126             {
1127                 res = do_hard_close(ch, index);
1128             }
1129             else
1130             {
1131                 res = do_close(ch, priority, index);
1132             }
1133         }
1134         else if (res == core::no_such_name)
1135         {
1136             // do not report errors if channel does not exist
1137             res = core::ok;
1138         }
1139 
1140         mtx_.unlock();
1141     }
1142 
1143     return res;
1144 }
1145 
1146 // synchronized by caller
do_close(channel * ch,std::size_t priority,std::size_t index)1147 core::result channel_group::do_close(
1148     channel * ch, std::size_t priority, std::size_t index)
1149 {
1150     bool close_me = false;
1151     core::result res = ch->post_close(priority, close_me);
1152 
1153     if (res == core::ok && close_me)
1154     {
1155         // immediate close
1156 
1157         res = do_hard_close(ch, index);
1158     }
1159 
1160     return res;
1161 }
1162 
1163 // synchronized by caller
do_hard_close(channel * ch,std::size_t index)1164 core::result channel_group::do_hard_close(channel * ch, std::size_t index)
1165 {
1166     core::result res = core::ok;
1167 
1168     bool destroyed = channel_dec_ref(index, ch);
1169     if (destroyed == false)
1170     {
1171         // channel was not destroyed,
1172         // because there are more references to it
1173         // -> wake up the worker, so it can finalize the process
1174 
1175         res = interrupt_work_waiter();
1176     }
1177 
1178     return res;
1179 }
1180 
add_listener(const char * target,core::new_incoming_connection_function connection_hook,void * connection_hook_hint,const char ** resolved_target)1181 core::result channel_group::add_listener(const char * target,
1182     core::new_incoming_connection_function connection_hook,
1183     void * connection_hook_hint,
1184     const char * * resolved_target)
1185 {
1186     core::result res;
1187     if (closing_)
1188     {
1189         res = core::bad_state;
1190     }
1191     else
1192     {
1193         listener * new_listener = static_cast<listener *>(
1194             alloc_->allocate(sizeof(listener)));
1195 
1196         // preparation of the listener is potentially blocking
1197         // and is performed with mutex unlocked
1198 
1199         if (new_listener != NULL)
1200         {
1201             new_listener->init(*alloc_, *this, mtx_,
1202                 connection_hook, connection_hook_hint,
1203                 io_error_callback_, io_error_callback_hint_);
1204 
1205             res = new_listener->prepare(target);
1206             if (res == core::ok)
1207             {
1208                 const char * res_target = new_listener->get_target();
1209 
1210                 if (resolved_target != NULL)
1211                 {
1212                     *resolved_target = res_target;
1213                 }
1214 
1215                 // add new listener to the list
1216 
1217                 mtx_.lock();
1218 
1219                 new_listener->next = first_listener_;
1220                 first_listener_ = new_listener;
1221 
1222                 mtx_.unlock();
1223 
1224                 if (event_notification_callback_ != NULL)
1225                 {
1226                     try
1227                     {
1228                         event_notification_callback_(
1229                             event_notification_hint_,
1230                             core::listener_added,
1231                             res_target, 0);
1232                     }
1233                     catch (...)
1234                     {
1235                         // ignore errors from user callback
1236                     }
1237                 }
1238 
1239                 // notify the selector so that it can
1240                 // take the new listener into account
1241 
1242                 res = selector_.interrupt();
1243             }
1244             else
1245             {
1246                 alloc_->deallocate(new_listener);
1247             }
1248         }
1249         else
1250         {
1251             res = core::no_memory;
1252         }
1253     }
1254 
1255     return res;
1256 }
1257 
remove_listener(const char * target)1258 void channel_group::remove_listener(const char * target)
1259 {
1260     if (closing_ == false)
1261     {
1262         mtx_.lock();
1263 
1264         listener * lst = first_listener_;
1265         bool found = false;
1266         while (lst != NULL && found == false)
1267         {
1268             if (std::strcmp(lst->get_target(), target) == 0)
1269             {
1270                 found = true;
1271             }
1272             else
1273             {
1274                 lst = lst->next;
1275             }
1276         }
1277 
1278         if (found)
1279         {
1280             lst->dec_ref();
1281             prune_listeners();
1282         }
1283 
1284         mtx_.unlock();
1285     }
1286 }
1287 
do_some_work(std::size_t timeout,bool allow_outgoing_traffic,bool allow_incoming_traffic)1288 core::result channel_group::do_some_work(std::size_t timeout,
1289     bool allow_outgoing_traffic, bool allow_incoming_traffic)
1290 {
1291     core::result res;
1292     bool processed_buffered_data = false;
1293 
1294     if (closing_)
1295     {
1296         res = core::bad_state;
1297     }
1298     else
1299     {
1300         // as a matter of optimization,
1301         // allow channels to flush their read buffers first
1302 
1303         mtx_.lock();
1304 
1305         res = core::ok;
1306         for (std::size_t i = 0;
1307             (res == core::ok) && (i != channels_num_); ++i)
1308         {
1309             channel * ch = channel_holders_[i].get_channel();
1310 
1311             if ((ch != NULL) && ch->has_buffered_data())
1312             {
1313                 ch->inc_ref();
1314 
1315                 while ((res == core::ok) && ch->has_buffered_data())
1316                 {
1317                     // if the channel has any buffered data, it should
1318                     // not perform any I/O action that would result
1319                     // in its request to self-close,
1320                     // so the close_me flag is not used
1321 
1322                     bool dummy_close_me;
1323                     res = ch->do_some_work(input, dummy_close_me);
1324                     if (res == core::ok)
1325                     {
1326                         // if there was some buffered data,
1327                         // do not attempt regular I/O after that,
1328                         // users might rely on this invocation to
1329                         // return cleanly without additional delays
1330 
1331                         processed_buffered_data = true;
1332                     }
1333                 }
1334 
1335                 ch->dec_ref();
1336             }
1337         }
1338 
1339         mtx_.unlock();
1340     }
1341 
1342     if ((res == core::ok) && (processed_buffered_data == false))
1343     {
1344         // note: listeners can change the set of channels
1345         // (they can add new channels to the collection),
1346         // and the work done by channels themselves can change the collection
1347         // as well (due to user activity in callbacks)
1348         // to avoid conflicts the list of channel pointers is first
1349         // copied to the shadow array and used for checking with the selector
1350         // so that the primary collection changes do not affect this process
1351 
1352         // it is possible that any given channel or listener will have
1353         // its ref counter decremented in the meantime - this is checked
1354         // after the whole selector operation is finished
1355 
1356         mtx_.lock();
1357 
1358         // make sure the shadow channel array has proper size
1359         res = core::ok;
1360         if (shadow_channels_num_ != channels_num_)
1361         {
1362             if (shadow_channels_ != NULL)
1363             {
1364                 alloc_->deallocate(shadow_channels_);
1365             }
1366 
1367             shadow_channels_ = static_cast<channel * *>(
1368                 alloc_->allocate(sizeof(channel *) * channels_num_));
1369             if (shadow_channels_ != NULL)
1370             {
1371                 shadow_channels_num_ = channels_num_;
1372             }
1373             else
1374             {
1375                 res = core::no_memory;
1376             }
1377         }
1378 
1379         if (res == core::ok)
1380         {
1381             selector_.reset();
1382 
1383             // freeze the shadow channel array and add channels to selector
1384             for (std::size_t i = 0;
1385                  (res == core::ok) && (i != channels_num_); ++i)
1386             {
1387                 channel * ch = channel_holders_[i].get_channel();
1388                 if (ch != NULL)
1389                 {
1390                     ch->inc_ref();
1391 
1392                     res = selector_.add_channel(*ch,
1393                         allow_outgoing_traffic, allow_incoming_traffic);
1394                 }
1395 
1396                 shadow_channels_[i] = ch;
1397             }
1398 
1399             // add listeners to selector
1400 
1401             listener * lst = first_listener_;
1402             while ((res == core::ok) && (lst != NULL))
1403             {
1404                 lst->inc_ref();
1405 
1406                 res = selector_.add_listener(*lst);
1407 
1408                 lst = lst->next;
1409             }
1410         }
1411 
1412         // at this point all channels and listeners are "pinned"
1413         // with increased ref counters
1414 
1415 #ifdef YAMI4_WITH_QNX
1416         if (res == core::ok)
1417         {
1418             // step 1. perform blocking send on all QNX channels
1419             for (std::size_t i = 0;
1420                  (res == core::ok) && (i != shadow_channels_num_); ++i)
1421             {
1422                 channel * ch = shadow_channels_[i];
1423 
1424                 if (ch != NULL)
1425                 {
1426                     bool close_me = false;
1427                     res = ch->send_blocking_datagram(close_me);
1428 
1429                     if (close_me)
1430                     {
1431                         // see later comments in this function
1432                         if (res == core::channel_closed)
1433                         {
1434                             res = core::ok;
1435                         }
1436                         else
1437                         {
1438                             // error condition
1439 
1440                             if (event_notification_callback_ != NULL)
1441                             {
1442                                 try
1443                                 {
1444                                     event_notification_callback_(
1445                                         event_notification_hint_,
1446                                         core::connection_error,
1447                                         ch->get_target(), 0);
1448                                 }
1449                                 catch (...)
1450                                 {
1451                                     // ignore errors from user callback
1452                                 }
1453                             }
1454                         }
1455 
1456                         channel_dec_ref(i, ch);
1457                     }
1458                 }
1459             }
1460         }
1461 #endif // YAMI4_WITH_QNX
1462 
1463         mtx_.unlock();
1464 
1465         if (res == core::ok)
1466         {
1467             // step 2. wait for the work on selector
1468             // the main mutex is unlocked during the waiting phase
1469             // and waiting can be interrupted by other parts of the agent,
1470             // if there is any work to do like change in the list of channels
1471             // or listeners or a new outgoing frame in any channel
1472             // (this allows the worker thread to come back to step 1 above as well)
1473 
1474             res = selector_.wait(timeout);
1475         }
1476 
1477         mtx_.lock();
1478 
1479         for (std::size_t i = 0;
1480              (res == core::ok) && (i != shadow_channels_num_); ++i)
1481         {
1482             channel * ch = shadow_channels_[i];
1483 
1484             if (ch != NULL)
1485             {
1486                 io_direction direction;
1487                 if (selector_.is_channel_ready(*ch, direction))
1488                 {
1489                     bool close_me = false;
1490                     res = ch->do_some_work(direction, close_me);
1491 
1492                     if (close_me)
1493                     {
1494                         // the channel should be closed due to error, EOF
1495                         // or regular close request in the outgoing queue
1496 
1497                         if (res == core::channel_closed)
1498                         {
1499                             // the EOF condition is not an error
1500                             // from the point of view of higher layers
1501                             // (ie. from the point of view of what
1502                             // do_some_work returns)
1503 
1504                             // note: the channel_closed value was
1505                             // used for the callback above to propagate
1506                             // proper reason to the user code
1507 
1508                             res = core::ok;
1509                         }
1510                         else
1511                         {
1512                             // error condition
1513 
1514                             if (event_notification_callback_ != NULL)
1515                             {
1516                                 try
1517                                 {
1518                                     event_notification_callback_(
1519                                         event_notification_hint_,
1520                                         core::connection_error,
1521                                         ch->get_target(), 0);
1522                                 }
1523                                 catch (...)
1524                                 {
1525                                     // ignore errors from user callback
1526                                 }
1527                             }
1528                         }
1529 
1530                         channel_dec_ref(i, ch);
1531                     }
1532                 }
1533             }
1534         }
1535 
1536         listener * lst = first_listener_;
1537         while ((res == core::ok) && (lst != NULL))
1538         {
1539             if (selector_.is_listener_ready(*lst))
1540             {
1541                 lst->inc_ref();
1542 
1543                 res = lst->do_some_work();
1544 
1545                 lst->dec_ref();
1546             }
1547 
1548             lst = lst->next;
1549         }
1550 
1551         // scan the list of listeners and remove those that are
1552         // no longer referenced by any thread
1553 
1554         prune_listeners();
1555 
1556         // unref the channels from the shadow array
1557         for (std::size_t i = 0; i != shadow_channels_num_; ++i)
1558         {
1559             channel * ch = shadow_channels_[i];
1560             if (ch != NULL)
1561             {
1562                 channel_dec_ref(i, ch);
1563             }
1564         }
1565 
1566         mtx_.unlock();
1567     }
1568 
1569     return res;
1570 }
1571 
1572 // synchronized by caller
process_complete_incoming_frame(core::channel_descriptor cd,const char * buffer,const std::size_t buffer_size)1573 core::result channel_group::process_complete_incoming_frame(
1574     core::channel_descriptor cd,
1575     const char * buffer, const std::size_t buffer_size)
1576 {
1577     core::result res;
1578     if (closing_)
1579     {
1580         res = core::bad_state;
1581     }
1582     else
1583     {
1584         res = core::no_such_index;
1585 
1586         std::size_t index;
1587         std::size_t sequence_number;
1588 
1589         cd.get_details(index, sequence_number);
1590 
1591         if (index < channels_num_)
1592         {
1593             channel_holder & ch_holder = channel_holders_[index];
1594             channel * ch = ch_holder.get_channel();
1595 
1596             if (ch != NULL)
1597             {
1598                 if (ch_holder.get_sequence_number() == sequence_number)
1599                 {
1600                     res = ch->process_complete_incoming_frame(
1601                         buffer, buffer_size);
1602                 }
1603             }
1604         }
1605     }
1606 
1607     return res;
1608 }
1609 
1610 #ifdef YAMI4_WITH_QNX
set_default_qnx_listening_channel_id(int chid)1611 void channel_group::set_default_qnx_listening_channel_id(int chid)
1612 {
1613     // only the first created QNX listener is remembered as default
1614     // for returning responses
1615 
1616     if (qnx_listener_channel_id_ == 0)
1617     {
1618         qnx_listener_channel_id_ = chid;
1619     }
1620 }
1621 #endif // YAMI4_WITH_QNX
1622 
interrupt_work_waiter()1623 core::result channel_group::interrupt_work_waiter()
1624 {
1625     const core::result res = selector_.interrupt();
1626     return res;
1627 }
1628 
get_channel_usage(int & max_allowed,int & used)1629 void channel_group::get_channel_usage(int & max_allowed, int & used)
1630 {
1631     selector_.get_channel_usage(max_allowed, used);
1632 }
1633 
get_pending_outgoing_bytes(core::channel_descriptor cd,std::size_t & bytes)1634 core::result channel_group::get_pending_outgoing_bytes(
1635     core::channel_descriptor cd, std::size_t & bytes)
1636 {
1637     core::result res;
1638     if (closing_)
1639     {
1640         res = core::bad_state;
1641     }
1642     else
1643     {
1644         res = core::no_such_index;
1645 
1646         std::size_t index;
1647         std::size_t sequence_number;
1648 
1649         cd.get_details(index, sequence_number);
1650 
1651         // do not report errors if channel does not exist
1652 
1653         mtx_.lock();
1654 
1655         if (index < channels_num_)
1656         {
1657             channel_holder & ch_holder = channel_holders_[index];
1658             channel * ch = ch_holder.get_channel();
1659 
1660             if (ch != NULL)
1661             {
1662                 if (ch_holder.get_sequence_number() == sequence_number)
1663                 {
1664                     bytes = ch->get_pending_outgoing_bytes();
1665 
1666                     res = core::ok;
1667                 }
1668             }
1669         }
1670 
1671         mtx_.unlock();
1672     }
1673 
1674     return res;
1675 }
1676 
get_pending_outgoing_bytes(const char * target,std::size_t & bytes)1677 core::result channel_group::get_pending_outgoing_bytes(
1678     const char * target, std::size_t & bytes)
1679 {
1680     core::result res;
1681     if (closing_)
1682     {
1683         res = core::bad_state;
1684     }
1685     else
1686     {
1687         std::size_t index;
1688         std::size_t dummy_sequence_number;
1689         channel * ch;
1690 
1691         mtx_.lock();
1692 
1693         // try to find the existing channel with the same target
1694         res = find_existing_channel(target,
1695             index, dummy_sequence_number, ch);
1696 
1697         if (res == core::ok)
1698         {
1699             bytes = ch->get_pending_outgoing_bytes();
1700         }
1701 
1702         mtx_.unlock();
1703     }
1704 
1705     return res;
1706 }
1707 
1708 // synchronized by caller
find_existing_channel(const char * target,std::size_t & index,std::size_t & sequence_number,channel * & ch) const1709 core::result channel_group::find_existing_channel(const char * target,
1710     std::size_t & index, std::size_t & sequence_number, channel * & ch) const
1711 {
1712     core::result res = core::no_such_name;
1713 
1714     for (std::size_t i = 0; i != channels_num_; ++i)
1715     {
1716         const channel_holder & ch_holder = channel_holders_[i];
1717         channel * chi = ch_holder.get_channel();
1718 
1719         if (chi != NULL)
1720         {
1721             if (std::strcmp(chi->get_target(), target) == 0)
1722             {
1723                 index = i;
1724                 sequence_number = ch_holder.get_sequence_number();
1725                 ch = chi;
1726                 res = core::ok;
1727                 break;
1728             }
1729         }
1730     }
1731 
1732     return res;
1733 }
1734 
1735 // synchronized by caller
find_unused_channel(std::size_t & index,bool reserve)1736 core::result channel_group::find_unused_channel(
1737     std::size_t & index, bool reserve)
1738 {
1739     bool found = false;
1740     for (std::size_t i = 0; found == false && i != channels_num_; ++i)
1741     {
1742         channel_holder & ch_holder = channel_holders_[i];
1743         const channel * ch = ch_holder.get_channel();
1744 
1745         if (ch_holder.is_reserved() == false && ch == NULL)
1746         {
1747             index = i;
1748             found = true;
1749 
1750             if (reserve)
1751             {
1752                 ch_holder.reserve();
1753             }
1754         }
1755     }
1756 
1757     core::result res;
1758     if (found)
1759     {
1760         res = core::ok;
1761     }
1762     else
1763     {
1764         const std::size_t initial_block_size = 10;
1765 
1766         std::size_t new_channels_num;
1767         if (channels_num_ < initial_block_size)
1768         {
1769             new_channels_num = initial_block_size;
1770         }
1771         else
1772         {
1773             new_channels_num = 2 * channels_num_;
1774         }
1775 
1776         channel_holder * new_channel_holders = static_cast<channel_holder *>(
1777             alloc_->allocate(sizeof(channel_holder) * new_channels_num));
1778         if (new_channel_holders != NULL)
1779         {
1780             std::memcpy(new_channel_holders, channel_holders_,
1781                 sizeof(channel_holder) * channels_num_);
1782 
1783             for (std::size_t i = channels_num_; i != new_channels_num; ++i)
1784             {
1785                 channel_holder * ch_holder = new_channel_holders + i;
1786                 ch_holder->init();
1787             }
1788 
1789             // pick the first slot after those that already existed
1790             index = channels_num_;
1791             if (reserve)
1792             {
1793                 new_channel_holders[index].reserve();
1794             }
1795 
1796             if (channel_holders_ != NULL)
1797             {
1798                 alloc_->deallocate(channel_holders_);
1799             }
1800 
1801             channel_holders_ = new_channel_holders;
1802             channels_num_ = new_channels_num;
1803 
1804             res = core::ok;
1805         }
1806         else
1807         {
1808             res = core::no_memory;
1809         }
1810     }
1811 
1812     return res;
1813 }
1814 
1815 // synchronized by caller
channel_dec_ref(std::size_t index,channel * ch)1816 bool channel_group::channel_dec_ref(std::size_t index, channel * ch)
1817 {
1818     bool destroyed = false;
1819 
1820     ch->dec_ref();
1821     if (ch->can_be_removed())
1822     {
1823         channel_holders_[index].clean();
1824         const char * target = ch->move_target();
1825 
1826         // the user callback can be invoked from this,
1827         // but the state of all structures is safe at this point
1828 
1829         ch->clean();
1830         alloc_->deallocate(ch);
1831 
1832         if (event_notification_callback_ != NULL)
1833         {
1834             try
1835             {
1836                 event_notification_callback_(
1837                     event_notification_hint_,
1838                     core::connection_closed,
1839                     target, 0);
1840             }
1841             catch (...)
1842             {
1843                 // ignore errors from user callback
1844             }
1845         }
1846 
1847         if (disconnection_hook_ != NULL)
1848         {
1849             // user callback is performed without the lock
1850             mtx_.unlock();
1851 
1852             try
1853             {
1854                 disconnection_hook_(disconnection_hook_hint_,
1855                     target, core::channel_closed);
1856             }
1857             catch (...)
1858             {
1859                 // ignore errors from user callback
1860             }
1861 
1862             mtx_.lock();
1863         }
1864 
1865         alloc_->deallocate(target);
1866 
1867         destroyed = true;
1868     }
1869 
1870     return destroyed;
1871 }
1872 
1873 // synchronized by caller
prune_listeners()1874 void channel_group::prune_listeners()
1875 {
1876     listener * * removal_point = &first_listener_;
1877     while (*removal_point != NULL)
1878     {
1879         listener * lst = *removal_point;
1880         if (lst->can_be_removed())
1881         {
1882             if (event_notification_callback_ != NULL)
1883             {
1884                 try
1885                 {
1886                     event_notification_callback_(
1887                         event_notification_hint_,
1888                         core::listener_removed,
1889                         lst->get_target(), 0);
1890                 }
1891                 catch (...)
1892                 {
1893                     // ignore errors from user callback
1894                 }
1895             }
1896 
1897             *removal_point = lst->next;
1898             lst->clean();
1899             alloc_->deallocate(lst);
1900         }
1901         else
1902         {
1903             removal_point = &(lst->next);
1904         }
1905     }
1906 }
1907 
1908 // synchronized by caller
generate_message_id()1909 std::size_t channel_group::generate_message_id()
1910 {
1911     return ++last_message_id_;
1912 }
1913