1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16
17 #include "../listener.h"
18 #include "../allocator.h"
19 #include "../channel_group.h"
20 #include "../fatal_errors.h"
21 #include "../io_error_handler.h"
22 #include "../network_utils.h"
23 #include "../options.h"
24 #include <cstdio>
25 #include <cstdlib>
26 #include <cstring>
27
28 #include <Winsock2.h>
29
30 using namespace yami;
31 using namespace details;
32
prepare_tcp(const char * address)33 core::result listener::prepare_tcp(const char * address)
34 {
35 protocol_ = proto_tcp;
36
37 ip_address ipa;
38 core::result res = parse_address(*alloc_, address, ipa,
39 io_error_callback_, io_error_callback_hint_);
40
41 if (res == core::ok)
42 {
43 io_descriptor_type fd = ::socket(AF_INET, SOCK_STREAM, 0);
44 if (fd != INVALID_SOCKET)
45 {
46 res = core::ok;
47 if (group_->get_options().tcp_reuseaddr)
48 {
49 res = set_reuseaddr(fd);
50 }
51 }
52 else
53 {
54 handle_io_error("create tcp listener socket",
55 io_error_callback_, io_error_callback_hint_);
56
57 res = core::io_error;
58 }
59
60 if (res == core::ok)
61 {
62 sockaddr_in local;
63
64 std::memset(&local, 0, sizeof(local));
65
66 local.sin_family = AF_INET;
67 local.sin_port = static_cast<u_short>(ipa.port);
68
69 if (ipa.host != 0)
70 {
71 local.sin_addr.s_addr = ipa.host;
72 }
73 else
74 {
75 local.sin_addr.s_addr = htonl(INADDR_ANY);
76 }
77
78 int cc = ::bind(fd,
79 reinterpret_cast<sockaddr *>(&local), sizeof(local));
80 if (cc != SOCKET_ERROR)
81 {
82 const int backlog =
83 group_->get_options().tcp_listen_backlog;
84
85 cc = ::listen(fd, backlog);
86 if (cc != SOCKET_ERROR)
87 {
88 fd_ = fd;
89 }
90 else
91 {
92 handle_io_error("tcp listen",
93 io_error_callback_, io_error_callback_hint_);
94
95 res = core::io_error;
96 }
97
98 if (res == core::ok)
99 {
100 // recreate the target based on (possibly)
101 // system-assigned values
102
103 // max length of the string representing the target
104 // (arbitrary, but should allow for prefix,
105 // address and port)
106 // note that address alone can be 256 chars
107 const std::size_t max_target_length = 270;
108
109 char * buf = static_cast<char *>(
110 alloc_->allocate(max_target_length));
111 if (buf != NULL)
112 {
113 int dummy = sizeof(local);
114 cc = ::getsockname(fd,
115 reinterpret_cast<sockaddr*>(&local), &dummy);
116 if (cc != SOCKET_ERROR)
117 {
118 const int assigned_port = ntohs(local.sin_port);
119
120 const int assigned_addr =
121 ntohl(local.sin_addr.s_addr);
122
123 if (assigned_addr == 0)
124 {
125 // if assigned address is local ANY,
126 // recreate it from gethostname
127
128 char this_host[max_target_length];
129 cc = ::gethostname(
130 this_host, max_target_length);
131 if (cc == 0)
132 {
133 _snprintf(buf, max_target_length,
134 "tcp://%s:%d",
135 this_host, assigned_port);
136 }
137 else
138 {
139 handle_io_error(
140 "tcp listener gethostname",
141 io_error_callback_,
142 io_error_callback_hint_);
143
144 res = core::io_error;
145 }
146 }
147 else
148 {
149 // the address was provided by user
150 // and resolved to number in the parse phase
151
152 const unsigned char * addr_bytes =
153 reinterpret_cast<const unsigned char *>(
154 &assigned_addr);
155
156 _snprintf(buf, max_target_length,
157 "tcp://%d.%d.%d.%d:%d",
158 static_cast<int>(addr_bytes[3]),
159 static_cast<int>(addr_bytes[2]),
160 static_cast<int>(addr_bytes[1]),
161 static_cast<int>(addr_bytes[0]),
162 assigned_port);
163 }
164
165 target_ = buf;
166 }
167 else
168 {
169 alloc_->deallocate(buf);
170
171 handle_io_error("tcp listener getsockname",
172 io_error_callback_, io_error_callback_hint_);
173
174 res = core::io_error;
175 }
176 }
177 else
178 {
179 res = core::no_memory;
180 }
181 }
182 }
183 else
184 {
185 handle_io_error("tcp listener bind",
186 io_error_callback_, io_error_callback_hint_);
187
188 res = core::io_error;
189 }
190 }
191
192 if (res != core::ok)
193 {
194 if (fd != INVALID_SOCKET)
195 {
196 ::closesocket(fd);
197 }
198
199 if (target_ != NULL)
200 {
201 alloc_->deallocate(target_);
202 }
203 }
204 }
205
206 return res;
207 }
208
prepare_udp(const char * address)209 core::result listener::prepare_udp(const char * address)
210 {
211 protocol_ = proto_udp;
212
213 ip_address ipa;
214 core::result res = parse_address(*alloc_, address, ipa,
215 io_error_callback_, io_error_callback_hint_);
216
217 if (res == core::ok)
218 {
219 io_descriptor_type fd = ::socket(AF_INET, SOCK_DGRAM, 0);
220 if (fd != INVALID_SOCKET)
221 {
222 sockaddr_in local;
223
224 std::memset(&local, 0, sizeof(local));
225
226 local.sin_family = AF_INET;
227 local.sin_port = ipa.port;
228
229 if (ipa.host != 0)
230 {
231 local.sin_addr.s_addr = ipa.host;
232 }
233 else
234 {
235 local.sin_addr.s_addr = htonl(INADDR_ANY);
236 }
237
238 int cc = ::bind(fd,
239 reinterpret_cast<sockaddr *>(&local), sizeof(local));
240 if (cc != SOCKET_ERROR)
241 {
242 fd_ = fd;
243
244 // recreate the target based on (possibly)
245 // system-assigned values
246
247 // max length of the string representing the target
248 // (arbitrary, but should allow for prefix,
249 // address and port)
250 // note that address alone can be 256 chars
251 const std::size_t max_target_length = 270;
252
253 char * buf = static_cast<char *>(
254 alloc_->allocate(max_target_length));
255 if (buf != NULL)
256 {
257 int dummy = sizeof(local);
258 cc = ::getsockname(fd,
259 reinterpret_cast<sockaddr*>(&local), &dummy);
260 if (cc != SOCKET_ERROR)
261 {
262 const int assigned_port = ntohs(local.sin_port);
263
264 // assigned_addr is in network byte order
265 const int assigned_addr = local.sin_addr.s_addr;
266
267 if (assigned_addr == 0)
268 {
269 // if assigned address is local ANY,
270 // recreate it from gethostname
271
272 char this_host[max_target_length];
273 cc = ::gethostname(
274 this_host, max_target_length);
275 if (cc == 0)
276 {
277 _snprintf(buf, max_target_length,
278 "udp://%s:%d",
279 this_host, assigned_port);
280 }
281 else
282 {
283 handle_io_error(
284 "udp listener gethostname",
285 io_error_callback_,
286 io_error_callback_hint_);
287
288 res = core::io_error;
289 }
290 }
291 else
292 {
293 // the address was provided by user
294 // and resolved to number in the parse phase
295
296 const unsigned char * addr_bytes =
297 reinterpret_cast<const unsigned char *>(
298 &assigned_addr);
299
300 // note: address is in network byte order
301 _snprintf(buf, max_target_length,
302 "udp://%d.%d.%d.%d:%d",
303 static_cast<int>(addr_bytes[0]),
304 static_cast<int>(addr_bytes[1]),
305 static_cast<int>(addr_bytes[2]),
306 static_cast<int>(addr_bytes[3]),
307 assigned_port);
308 }
309
310 target_ = buf;
311 }
312 else
313 {
314 alloc_->deallocate(buf);
315
316 handle_io_error("udp listener getsockname",
317 io_error_callback_, io_error_callback_hint_);
318
319 res = core::io_error;
320 }
321 }
322 else
323 {
324 res = core::no_memory;
325 }
326 }
327 else
328 {
329 handle_io_error("udp listener bind",
330 io_error_callback_, io_error_callback_hint_);
331
332 res = core::io_error;
333 }
334 }
335 else
336 {
337 handle_io_error("create udp listener socket",
338 io_error_callback_, io_error_callback_hint_);
339
340 res = core::io_error;
341 }
342
343 if (res == core::ok)
344 {
345 // allocate udp frame buffer of appropriate size
346
347 frame_buffer_size_ = group_->get_options().udp_frame_size;
348
349 frame_buffer_ = static_cast<char *>(
350 alloc_->allocate(frame_buffer_size_));
351 if (frame_buffer_ == NULL)
352 {
353 res = core::no_memory;
354 }
355 }
356
357 if (res != core::ok)
358 {
359 if (fd != INVALID_SOCKET)
360 {
361 ::closesocket(fd);
362 }
363
364 if (target_ != NULL)
365 {
366 alloc_->deallocate(target_);
367 }
368
369 if (frame_buffer_ != NULL)
370 {
371 alloc_->deallocate(frame_buffer_);
372 }
373 }
374 }
375
376 return res;
377 }
378
prepare_unix(const char * path)379 core::result listener::prepare_unix(const char * path)
380 {
381 return core::bad_protocol;
382 }
383
384 #ifdef YAMI4_WITH_OPEN_SSL
prepare_tcps(const char * address)385 core::result listener::prepare_tcps(const char * address)
386 {
387 protocol_ = proto_tcps;
388
389 ip_address ipa;
390 core::result res = parse_address(*alloc_, address, ipa,
391 io_error_callback_, io_error_callback_hint_);
392
393 if (res == core::ok)
394 {
395 io_descriptor_type fd = ::socket(AF_INET, SOCK_STREAM, 0);
396 if (fd != INVALID_SOCKET)
397 {
398 res = core::ok;
399 if (group_->get_options().tcp_reuseaddr)
400 {
401 res = set_reuseaddr(fd);
402 }
403 }
404 else
405 {
406 handle_io_error("create tcps listener socket",
407 io_error_callback_, io_error_callback_hint_);
408
409 res = core::io_error;
410 }
411
412 if (res == core::ok)
413 {
414 sockaddr_in local;
415
416 std::memset(&local, 0, sizeof(local));
417
418 local.sin_family = AF_INET;
419 local.sin_port = static_cast<u_short>(ipa.port);
420
421 if (ipa.host != 0)
422 {
423 local.sin_addr.s_addr = ipa.host;
424 }
425 else
426 {
427 local.sin_addr.s_addr = htonl(INADDR_ANY);
428 }
429
430 int cc = ::bind(fd,
431 reinterpret_cast<sockaddr *>(&local), sizeof(local));
432 if (cc != SOCKET_ERROR)
433 {
434 const int backlog =
435 group_->get_options().tcp_listen_backlog;
436
437 cc = ::listen(fd, backlog);
438 if (cc != SOCKET_ERROR)
439 {
440 fd_ = fd;
441 }
442 else
443 {
444 handle_io_error("tcps listen",
445 io_error_callback_, io_error_callback_hint_);
446
447 res = core::io_error;
448 }
449
450 if (res == core::ok)
451 {
452 // recreate the target based on (possibly)
453 // system-assigned values
454
455 // max length of the string representing the target
456 // (arbitrary, but should allow for prefix,
457 // address and port)
458 // note that address alone can be 256 chars
459 const std::size_t max_target_length = 270;
460
461 char * buf = static_cast<char *>(
462 alloc_->allocate(max_target_length));
463 if (buf != NULL)
464 {
465 int dummy = sizeof(local);
466 cc = ::getsockname(fd,
467 reinterpret_cast<sockaddr*>(&local), &dummy);
468 if (cc != SOCKET_ERROR)
469 {
470 const int assigned_port = ntohs(local.sin_port);
471
472 const int assigned_addr =
473 ntohl(local.sin_addr.s_addr);
474
475 if (assigned_addr == 0)
476 {
477 // if assigned address is local ANY,
478 // recreate it from gethostname
479
480 char this_host[max_target_length];
481 cc = ::gethostname(
482 this_host, max_target_length);
483 if (cc == 0)
484 {
485 _snprintf(buf, max_target_length,
486 "tcps://%s:%d",
487 this_host, assigned_port);
488 }
489 else
490 {
491 handle_io_error(
492 "tcps listener gethostname",
493 io_error_callback_,
494 io_error_callback_hint_);
495
496 res = core::io_error;
497 }
498 }
499 else
500 {
501 // the address was provided by user
502 // and resolved to number in the parse phase
503
504 const unsigned char * addr_bytes =
505 reinterpret_cast<const unsigned char *>(
506 &assigned_addr);
507
508 _snprintf(buf, max_target_length,
509 "tcps://%d.%d.%d.%d:%d",
510 static_cast<int>(addr_bytes[3]),
511 static_cast<int>(addr_bytes[2]),
512 static_cast<int>(addr_bytes[1]),
513 static_cast<int>(addr_bytes[0]),
514 assigned_port);
515 }
516
517 target_ = buf;
518 }
519 else
520 {
521 alloc_->deallocate(buf);
522
523 handle_io_error("tcps listener getsockname",
524 io_error_callback_, io_error_callback_hint_);
525
526 res = core::io_error;
527 }
528 }
529 else
530 {
531 res = core::no_memory;
532 }
533 }
534 }
535 else
536 {
537 handle_io_error("tcps listener bind",
538 io_error_callback_, io_error_callback_hint_);
539
540 res = core::io_error;
541 }
542 }
543
544 if (res != core::ok)
545 {
546 if (fd != INVALID_SOCKET)
547 {
548 ::closesocket(fd);
549 }
550
551 if (target_ != NULL)
552 {
553 alloc_->deallocate(target_);
554 }
555 }
556 }
557
558 return res;
559 }
560 #endif // YAMI4_WITH_OPEN_SSL
561
close_resource()562 void listener::close_resource()
563 {
564 ::closesocket(fd_);
565 }
566
567 // synchronized by caller
do_some_work()568 core::result listener::do_some_work()
569 {
570 // accept new connection from the listening socket
571 // assume that the listening socket is already ready for "reading"
572 // (otherwise this operation can block)
573
574 core::result res;
575
576 switch (protocol_)
577 {
578 case proto_tcp:
579 res = accept_tcp();
580 break;
581 case proto_udp:
582 res = accept_udp();
583 break;
584 #ifdef YAMI4_WITH_OPEN_SSL
585 case proto_tcps:
586 res = accept_tcps();
587 break;
588 #endif // YAMI4_WITH_OPEN_SSL
589
590 // the proto_unix branch will never be taken,
591 // as such listener can not be created
592 default:
593 fatal_failure(__FILE__, __LINE__);
594 }
595
596 return res;
597 }
598
accept_tcp()599 core::result listener::accept_tcp()
600 {
601 // accept new connection from the listening socket
602 // assume that the listening socket is already ready for "reading"
603 // (otherwise this operation can block)
604
605 core::result res;
606
607 sockaddr_in client_addr;
608 int client_addr_size = sizeof(client_addr);
609
610 const io_descriptor_type new_fd = ::accept(fd_,
611 reinterpret_cast<sockaddr *>(&client_addr), &client_addr_size);
612 if (new_fd != INVALID_SOCKET)
613 {
614 res = core::ok;
615
616 if (group_->get_options().tcp_nonblocking)
617 {
618 res = set_nonblocking(new_fd);
619 }
620
621 if (res == core::ok)
622 {
623 if (group_->get_options().tcp_nodelay)
624 {
625 res = set_nodelay(new_fd);
626 }
627 }
628
629 if (res == core::ok)
630 {
631 if (group_->get_options().tcp_keepalive)
632 {
633 res = set_keepalive(new_fd);
634 }
635 }
636
637 if (res == core::ok)
638 {
639 // extract the client address and create new channel
640 // with that address as the target
641
642 // target created for the new channel will have the form:
643 // "tcp://xxx.xxx.xxx.xxx:yyyyyyy"
644 const std::size_t target_size = 30;
645 char * target = static_cast<char *>(
646 alloc_->allocate(target_size));
647 if (target != NULL)
648 {
649 const unsigned char * tmp =
650 reinterpret_cast<unsigned char *>(&client_addr.sin_addr);
651
652 _snprintf(target, target_size, "tcp://%d.%d.%d.%d:%d",
653 static_cast<int>(tmp[0]),
654 static_cast<int>(tmp[1]),
655 static_cast<int>(tmp[2]),
656 static_cast<int>(tmp[3]),
657 ntohs(client_addr.sin_port));
658
659 const std::size_t preferred_frame_size =
660 group_->get_options().tcp_frame_size;
661
662 core::channel_descriptor new_descriptor;
663 channel * dummy_new_channel;
664
665 res = group_->add_existing(target, new_fd, proto_tcp,
666 preferred_frame_size, new_descriptor, dummy_new_channel);
667
668 // at this point the new channel is already seen
669 // in the proper state
670
671 if (res == core::ok && connection_hook_ != NULL)
672 {
673 mtx_->unlock();
674
675 std::size_t index;
676 std::size_t sequence_number;
677 new_descriptor.get_details(index, sequence_number);
678
679 try
680 {
681 connection_hook_(connection_hook_hint_,
682 target, index, sequence_number);
683 }
684 catch (...)
685 {
686 // ignore exceptions from user code
687 }
688
689 mtx_->lock();
690 }
691
692 if (res != core::ok)
693 {
694 alloc_->deallocate(target);
695 }
696 }
697 else
698 {
699 res = core::no_memory;
700 }
701 }
702
703 if (res != core::ok)
704 {
705 ::closesocket(new_fd);
706 }
707 }
708 else
709 {
710 handle_io_error("tcp listener accept",
711 io_error_callback_, io_error_callback_hint_);
712
713 res = core::io_error;
714 }
715
716 return res;
717 }
718
accept_udp()719 core::result listener::accept_udp()
720 {
721 // "Accepting" at the UDP listener is an artificial term
722 // that really relates to receiving of regular datagram
723 // and not to establishing any connection.
724 // The datagram is received and placed in the appropriate channel
725 // according to the sender address information.
726
727 core::result res;
728
729 sockaddr_in client_addr;
730 int client_addr_size = sizeof(client_addr);
731
732 const int readn =
733 ::recvfrom(fd_, frame_buffer_, frame_buffer_size_, 0,
734 reinterpret_cast<sockaddr *>(&client_addr), &client_addr_size);
735 if (readn >= 0)
736 {
737 // extract the client address and make sure that a channel
738 // with that address as the target exists
739
740 // target created for the channel will have the form:
741 // "udp://xxx.xxx.xxx.xxx:yyyyyyy"
742 const std::size_t target_size = 30;
743 char * target = static_cast<char *>(
744 alloc_->allocate(target_size));
745 if (target != NULL)
746 {
747 const unsigned char * tmp =
748 reinterpret_cast<unsigned char *>(&client_addr.sin_addr);
749
750 _snprintf(target, target_size, "udp://%d.%d.%d.%d:%d",
751 static_cast<int>(tmp[0]),
752 static_cast<int>(tmp[1]),
753 static_cast<int>(tmp[2]),
754 static_cast<int>(tmp[3]),
755 ntohs(client_addr.sin_port));
756
757 core::channel_descriptor descriptor;
758
759 // make sure the channel for the client exists
760 // (create if it is a new channel)
761
762 bool created_new_channel;
763 res = group_->open(target, descriptor,
764 created_new_channel, false);
765
766 alloc_->deallocate(target);
767
768 // at this point the new channel is already seen
769 // in the proper state
770
771 if (created_new_channel)
772 {
773 // this channel was freshly created
774 // -> call the connection hook
775
776 if (res == core::ok && connection_hook_ != NULL)
777 {
778 mtx_->unlock();
779
780 std::size_t index;
781 std::size_t sequence_number;
782 descriptor.get_details(index, sequence_number);
783
784 try
785 {
786 connection_hook_(connection_hook_hint_,
787 target, index, sequence_number);
788 }
789 catch (...)
790 {
791 // ignore exceptions from user code
792 }
793
794 mtx_->lock();
795 }
796 }
797
798 if (res == core::ok)
799 {
800 // inject the received datagram into the channel
801
802 res = group_->process_complete_incoming_frame(
803 descriptor, frame_buffer_, readn);
804 }
805 }
806 else
807 {
808 res = core::no_memory;
809 }
810 }
811 else
812 {
813 handle_io_error("udp listener receive",
814 io_error_callback_, io_error_callback_hint_);
815
816 res = core::io_error;
817 }
818
819 return res;
820 }
821
accept_unix()822 core::result listener::accept_unix()
823 {
824 // this will never be called
825
826 return core::bad_protocol;
827 }
828
829 #ifdef YAMI4_WITH_OPEN_SSL
accept_tcps()830 core::result listener::accept_tcps()
831 {
832 // accept new connection from the listening socket
833 // assume that the listening socket is already ready for "reading"
834 // (otherwise this operation can block)
835
836 core::result res;
837
838 sockaddr_in client_addr;
839 int client_addr_size = sizeof(client_addr);
840
841 const io_descriptor_type new_fd = ::accept(fd_,
842 reinterpret_cast<sockaddr *>(&client_addr), &client_addr_size);
843 if (new_fd != INVALID_SOCKET)
844 {
845 res = core::ok;
846
847 if (group_->get_options().tcp_nonblocking)
848 {
849 res = set_nonblocking(new_fd);
850 }
851
852 if (res == core::ok)
853 {
854 if (group_->get_options().tcp_nodelay)
855 {
856 res = set_nodelay(new_fd);
857 }
858 }
859
860 if (res == core::ok)
861 {
862 if (group_->get_options().tcp_keepalive)
863 {
864 res = set_keepalive(new_fd);
865 }
866 }
867
868 if (res == core::ok)
869 {
870 // extract the client address and create new channel
871 // with that address as the target
872
873 // target created for the new channel will have the form:
874 // "tcps://xxx.xxx.xxx.xxx:yyyyyyy"
875 const std::size_t target_size = 30;
876 char * target = static_cast<char *>(
877 alloc_->allocate(target_size));
878 if (target != NULL)
879 {
880 const unsigned char * tmp =
881 reinterpret_cast<unsigned char *>(&client_addr.sin_addr);
882
883 _snprintf(target, target_size, "tcps://%d.%d.%d.%d:%d",
884 static_cast<int>(tmp[0]),
885 static_cast<int>(tmp[1]),
886 static_cast<int>(tmp[2]),
887 static_cast<int>(tmp[3]),
888 ntohs(client_addr.sin_port));
889
890 const std::size_t preferred_frame_size =
891 group_->get_options().tcp_frame_size;
892
893 core::channel_descriptor new_descriptor;
894 channel * dummy_new_channel;
895
896 res = group_->add_existing_ssl(target, new_fd, proto_tcps,
897 preferred_frame_size, new_descriptor);
898
899 // at this point the new channel is already seen
900 // in the proper state
901
902 if (res == core::ok && connection_hook_ != NULL)
903 {
904 mtx_->unlock();
905
906 std::size_t index;
907 std::size_t sequence_number;
908 new_descriptor.get_details(index, sequence_number);
909
910 try
911 {
912 connection_hook_(connection_hook_hint_,
913 target, index, sequence_number);
914 }
915 catch (...)
916 {
917 // ignore exceptions from user code
918 }
919
920 mtx_->lock();
921 }
922
923 if (res != core::ok)
924 {
925 alloc_->deallocate(target);
926 }
927 }
928 else
929 {
930 res = core::no_memory;
931 }
932 }
933
934 if (res != core::ok)
935 {
936 ::closesocket(new_fd);
937 }
938 }
939 else
940 {
941 handle_io_error("tcps listener accept",
942 io_error_callback_, io_error_callback_hint_);
943
944 res = core::io_error;
945 }
946
947 return res;
948 }
949 #endif // YAMI4_WITH_OPEN_SSL
950