1 #include "tcp_trsp.h" 2 #include "ip_util.h" 3 #include "parse_common.h" 4 #include "sip_parser.h" 5 #include "trans_layer.h" 6 #include "hash.h" 7 8 #include "AmUtils.h" 9 10 #include <netdb.h> 11 #include <event2/event.h> 12 #include <string.h> 13 #include <fcntl.h> 14 #include <sys/ioctl.h> 15 16 17 void tcp_trsp_socket::on_sock_read(int fd, short ev, void* arg) 18 { 19 if(ev & (EV_READ|EV_TIMEOUT)){ 20 ((tcp_trsp_socket*)arg)->on_read(ev); 21 } 22 } 23 24 void tcp_trsp_socket::on_sock_write(int fd, short ev, void* arg) 25 { 26 if(ev & (EV_WRITE|EV_TIMEOUT)){ 27 ((tcp_trsp_socket*)arg)->on_write(ev); 28 } 29 } 30 31 tcp_trsp_socket::tcp_trsp_socket(tcp_server_socket* server_sock, 32 tcp_server_worker* server_worker, 33 int sd, const sockaddr_storage* sa, 34 struct event_base* evbase) 35 : trsp_socket(server_sock->get_if(),0,0,sd), 36 server_sock(server_sock), server_worker(server_worker), 37 closed(false), connected(false), 38 input_len(0), evbase(evbase), 39 read_ev(NULL), write_ev(NULL) 40 { ~sip_ua()41 // local address 42 ip = server_sock->get_ip(); 43 port = server_sock->get_port(); 44 server_sock->copy_addr_to(&addr); 45 46 // peer address 47 memcpy(&peer_addr,sa,sizeof(sockaddr_storage)); 48 49 char host[NI_MAXHOST] = ""; 50 peer_ip = am_inet_ntop(&peer_addr,host,NI_MAXHOST); 51 peer_port = am_get_port(&peer_addr); 52 53 // async parser state 54 pst.reset((char*)input_buf); 55 56 if(sd > 0) { 57 create_events(); 58 } 59 } 60 61 void tcp_trsp_socket::create_connected(tcp_server_socket* server_sock, 62 tcp_server_worker* server_worker, 63 int sd, const sockaddr_storage* sa, 64 struct event_base* evbase) 65 { 66 if(sd < 0) 67 return; 68 69 tcp_trsp_socket* sock = new tcp_trsp_socket(server_sock,server_worker, 70 sd,sa,evbase); 71 72 inc_ref(sock); 73 server_worker->add_connection(sock); 74 75 sock->connected = true; 76 sock->add_read_event(); 77 dec_ref(sock); 78 } 79 80 tcp_trsp_socket* tcp_trsp_socket::new_connection(tcp_server_socket* server_sock, 81 tcp_server_worker* server_worker, 82 const sockaddr_storage* sa, 83 struct event_base* evbase) 84 { 85 return new tcp_trsp_socket(server_sock,server_worker,-1,sa,evbase); 86 } 87 88 89 tcp_trsp_socket::~tcp_trsp_socket() 90 { 91 DBG("********* connection destructor ***********"); 92 event_free(read_ev); 93 event_free(write_ev); 94 } 95 96 void tcp_trsp_socket::create_events() 97 { 98 read_ev = event_new(evbase, sd, EV_READ|EV_PERSIST, 99 tcp_trsp_socket::on_sock_read, 100 (void *)this); 101 102 write_ev = event_new(evbase, sd, EV_WRITE, 103 tcp_trsp_socket::on_sock_write, 104 (void *)this); 105 } 106 107 void tcp_trsp_socket::add_read_event_ul() 108 { 109 sock_mut.unlock(); 110 add_read_event(); 111 sock_mut.lock(); 112 } 113 114 void tcp_trsp_socket::add_read_event() 115 { 116 event_add(read_ev, server_sock->get_idle_timeout()); 117 } 118 119 void tcp_trsp_socket::add_write_event_ul(struct timeval* timeout) 120 { 121 sock_mut.unlock(); 122 add_write_event(timeout); 123 sock_mut.lock(); 124 } 125 126 void tcp_trsp_socket::add_write_event(struct timeval* timeout) 127 { 128 event_add(write_ev, timeout); 129 } 130 131 void tcp_trsp_socket::copy_peer_addr(sockaddr_storage* sa) 132 { 133 memcpy(sa,&peer_addr,sizeof(sockaddr_storage)); 134 } 135 136 tcp_trsp_socket::msg_buf::msg_buf(const sockaddr_storage* sa, const char* msg, 137 const int msg_len) 138 : msg_len(msg_len) 139 { 140 memcpy(&addr,sa,sizeof(sockaddr_storage)); 141 cursor = this->msg = new char[msg_len]; 142 memcpy(this->msg,msg,msg_len); 143 } 144 145 tcp_trsp_socket::msg_buf::~msg_buf() 146 { 147 delete [] msg; 148 } 149 150 int tcp_trsp_socket::on_connect(short ev) 151 { 152 DBG("************ on_connect() ***********"); 153 154 if(ev & EV_TIMEOUT) { 155 DBG("********** connection timeout on sd=%i ************\n",sd); 156 close(); 157 return -1; 158 } 159 160 socklen_t len = sizeof(int); 161 int error = 0; 162 if(getsockopt(sd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) { 163 ERROR("getsockopt: %s",strerror(errno)); 164 close(); 165 return -1; 166 } 167 168 if(error != 0) { 169 DBG("*********** connection error (sd=%i): %s *********", 170 sd,strerror(error)); 171 close(); 172 return -1; 173 } 174 175 connected = true; 176 add_read_event(); 177 178 return 0; 179 } 180 181 int tcp_trsp_socket::connect() 182 { 183 if(sd > 0) { 184 ERROR("pending connection request: close first."); 185 return -1; 186 } 187 188 if((sd = socket(peer_addr.ss_family,SOCK_STREAM,0)) == -1){ 189 ERROR("socket: %s\n",strerror(errno)); 190 return -1; 191 } 192 193 int true_opt = 1; 194 if(ioctl(sd, FIONBIO , &true_opt) == -1) { 195 ERROR("could not make new connection non-blocking: %s\n",strerror(errno)); 196 ::close(sd); 197 sd = -1; 198 return -1; 199 } 200 201 DBG("connecting to %s:%i...", 202 am_inet_ntop(&peer_addr).c_str(), 203 am_get_port(&peer_addr)); 204 205 return ::connect(sd, (const struct sockaddr*)&peer_addr, 206 SA_len(&peer_addr)); 207 } 208 209 int tcp_trsp_socket::check_connection() 210 { 211 if(sd < 0){ 212 int ret = connect(); 213 if(ret < 0) { 214 if(errno != EINPROGRESS && errno != EALREADY) { 215 ERROR("could not connect: %s",strerror(errno)); 216 ::close(sd); 217 sd = -1; 218 return -1; 219 } 220 } 221 222 // it's time to create the events... 223 create_events(); 224 225 if(ret < 0) { 226 add_write_event_ul(server_sock->get_connect_timeout()); 227 DBG("connect event added..."); 228 229 // because of unlock in ad_write_event_ul, 230 // on_connect() might already have been scheduled 231 if(closed) 232 return -1; 233 } 234 else { 235 // connect succeeded immediatly 236 connected = true; 237 add_read_event_ul(); 238 } 239 } 240 241 return 0; 242 } 243 244 int tcp_trsp_socket::send(const sockaddr_storage* sa, const char* msg, 245 const int msg_len, unsigned int flags) 246 { 247 AmLock _l(sock_mut); 248 249 if(closed || (check_connection() < 0)) 250 return -1; 251 252 send_q.push_back(new msg_buf(sa,msg,msg_len)); 253 254 if(connected) { 255 add_write_event_ul(); 256 DBG("write event added..."); 257 } 258 259 return 0; 260 } 261 262 void tcp_trsp_socket::close() 263 { 264 inc_ref(this); 265 server_worker->remove_connection(this); 266 267 closed = true; 268 DBG("********* closing connection ***********"); 269 270 event_del(read_ev); 271 event_del(write_ev); 272 273 if(sd > 0) { 274 ::close(sd); 275 sd = -1; 276 } 277 278 generate_transport_errors(); 279 dec_ref(this); 280 } 281 282 void tcp_trsp_socket::generate_transport_errors() 283 { 284 while(!send_q.empty()) { 285 286 msg_buf* msg = send_q.front(); 287 send_q.pop_front(); 288 289 sip_msg s_msg(msg->msg,msg->msg_len); 290 delete msg; 291 292 copy_peer_addr(&s_msg.remote_ip); 293 copy_addr_to(&s_msg.local_ip); 294 295 trans_layer::instance()->transport_error(&s_msg); 296 } 297 } 298 299 void tcp_trsp_socket::on_read(short ev) 300 { 301 int bytes = 0; 302 char* old_cursor = (char*)get_input(); 303 304 {// locked section 305 306 if(ev & EV_TIMEOUT) { 307 DBG("************ idle timeout: closing connection **********"); 308 close(); 309 return; 310 } 311 312 AmLock _l(sock_mut); 313 DBG("on_read (connected = %i)",connected); 314 315 bytes = ::read(sd,get_input(),get_input_free_space()); 316 if(bytes < 0) { 317 switch(errno) { 318 case EAGAIN: 319 return; // nothing to read 320 321 case ECONNRESET: 322 case ENOTCONN: 323 DBG("connection has been closed (sd=%i)",sd); 324 close(); 325 return; 326 327 case ETIMEDOUT: 328 DBG("transmission timeout (sd=%i)",sd); 329 close(); 330 return; 331 332 default: 333 DBG("unknown error (%i): %s",errno,strerror(errno)); 334 close(); 335 return; 336 } 337 } 338 else if(bytes == 0) { 339 // connection closed 340 DBG("connection has been closed (sd=%i)",sd); 341 close(); 342 return; 343 } 344 }// end of - locked section 345 346 input_len += bytes; 347 348 DBG("received: <%.*s>",bytes,old_cursor); 349 350 // ... and parse it 351 if(parse_input() < 0) { 352 DBG("Error while parsing input: closing connection!"); 353 sock_mut.lock(); 354 close(); 355 sock_mut.unlock(); 356 } 357 } 358 359 int tcp_trsp_socket::parse_input() 360 { 361 for(;;) { 362 int err = skip_sip_msg_async(&pst, (char*)(input_buf+input_len)); 363 if(err) { 364 365 if(err == UNEXPECTED_EOT) { 366 367 if(pst.orig_buf > (char*)input_buf) { 368 369 int addr_shift = pst.orig_buf - (char*)input_buf; 370 memmove(input_buf, pst.orig_buf, input_len - addr_shift); 371 372 pst.orig_buf = (char*)input_buf; 373 pst.c -= addr_shift; 374 if(pst.beg) 375 pst.beg -= addr_shift; 376 input_len -= addr_shift; 377 378 return 0; 379 } 380 else if(get_input_free_space()){ 381 return 0; 382 } 383 384 ERROR("message way too big! drop connection..."); 385 } 386 else { 387 ERROR("parsing error %i",err); 388 } 389 390 pst.reset((char*)input_buf); 391 reset_input(); 392 393 return -1; 394 } 395 396 int msg_len = pst.get_msg_len(); 397 DBG("received msg:\n%.*s",msg_len,pst.orig_buf); 398 399 sip_msg* s_msg = new sip_msg((const char*)pst.orig_buf,msg_len); 400 401 copy_peer_addr(&s_msg->remote_ip); 402 copy_addr_to(&s_msg->local_ip); 403 404 s_msg->local_socket = this; 405 inc_ref(this); 406 407 // pass message to the parser / transaction layer 408 trans_layer::instance()->received_msg(s_msg); 409 410 char* msg_end = pst.orig_buf + msg_len; 411 char* input_end = (char*)input_buf + input_len; 412 413 if(msg_end < input_end) { 414 pst.reset(msg_end); 415 } 416 else { 417 pst.reset((char*)input_buf); 418 reset_input(); 419 return 0; 420 } 421 } 422 423 // fake: 424 //return 0; 425 } 426 427 void tcp_trsp_socket::on_write(short ev) 428 { 429 AmLock _l(sock_mut); 430 431 DBG("on_write (connected = %i)",connected); 432 if(!connected) { 433 if(on_connect(ev) != 0) { 434 return; 435 } 436 } 437 438 while(!send_q.empty()) { 439 440 msg_buf* msg = send_q.front(); 441 if(!msg || !msg->bytes_left()) { 442 send_q.pop_front(); 443 delete msg; 444 continue; 445 } 446 447 // send msg 448 int bytes = write(sd,msg->cursor,msg->bytes_left()); 449 if(bytes < 0) { 450 DBG("error on write: %i",bytes); 451 switch(errno){ 452 case EINTR: 453 case EAGAIN: // would block 454 add_write_event(); 455 break; 456 457 default: // unforseen error: close connection 458 ERROR("unforseen error: close connection (%i/%s)", 459 errno,strerror(errno)); 460 close(); 461 break; 462 } 463 return; 464 } 465 466 DBG("bytes written: <%.*s>",bytes,msg->cursor); 467 468 if(bytes < msg->bytes_left()) { 469 msg->cursor += bytes; 470 add_write_event(); 471 return; 472 } 473 474 send_q.pop_front(); 475 delete msg; 476 } 477 } 478 479 tcp_server_worker::tcp_server_worker(tcp_server_socket* server_sock) 480 : server_sock(server_sock) 481 { 482 evbase = event_base_new(); 483 } 484 485 tcp_server_worker::~tcp_server_worker() 486 { 487 event_base_free(evbase); 488 } 489 490 void tcp_server_worker::add_connection(tcp_trsp_socket* client_sock) 491 { 492 string conn_id = client_sock->get_peer_ip() 493 + ":" + int2str(client_sock->get_peer_port()); 494 495 DBG("new TCP connection from %s:%u", 496 client_sock->get_peer_ip().c_str(), 497 client_sock->get_peer_port()); 498 499 connections_mut.lock(); 500 map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id); 501 if(sock_it != connections.end()) { 502 dec_ref(sock_it->second); 503 sock_it->second = client_sock; 504 } 505 else { 506 connections[conn_id] = client_sock; 507 } 508 inc_ref(client_sock); 509 connections_mut.unlock(); 510 } 511 512 void tcp_server_worker::remove_connection(tcp_trsp_socket* client_sock) 513 { 514 string conn_id = client_sock->get_peer_ip() 515 + ":" + int2str(client_sock->get_peer_port()); 516 517 DBG("removing TCP connection from %s",conn_id.c_str()); 518 519 connections_mut.lock(); 520 map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id); 521 if(sock_it != connections.end()) { 522 dec_ref(sock_it->second); 523 connections.erase(sock_it); 524 DBG("TCP connection from %s removed",conn_id.c_str()); 525 } 526 connections_mut.unlock(); 527 } 528 529 int tcp_server_worker::send(const sockaddr_storage* sa, const char* msg, 530 const int msg_len, unsigned int flags) 531 { 532 char host_buf[NI_MAXHOST]; 533 string dest = am_inet_ntop(sa,host_buf,NI_MAXHOST); 534 dest += ":" + int2str(am_get_port(sa)); 535 536 tcp_trsp_socket* sock = NULL; 537 538 bool new_conn=false; 539 connections_mut.lock(); 540 map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(dest); 541 if(sock_it != connections.end()) { 542 sock = sock_it->second; 543 inc_ref(sock); 544 } 545 else { 546 //TODO: add flags to avoid new connections (ex: UAs behind NAT) 547 tcp_trsp_socket* new_sock = tcp_trsp_socket::new_connection(server_sock,this, 548 sa,evbase); 549 connections[dest] = new_sock; 550 inc_ref(new_sock); 551 552 sock = new_sock; 553 inc_ref(sock); 554 new_conn = true; 555 } 556 connections_mut.unlock(); 557 558 // must be done outside from connections_mut 559 // to avoid dead-lock with the event base 560 int ret = sock->send(sa,msg,msg_len,flags); 561 if((ret < 0) && new_conn) { 562 remove_connection(sock); 563 } 564 dec_ref(sock); 565 566 return ret; 567 } 568 569 void tcp_server_worker::run() 570 { 571 // fake event to prevent the event loop from exiting 572 int fake_fds[2]; 573 struct event* ev_default = NULL; 574 int res = pipe(fake_fds); 575 if (res<0) { 576 ERROR("creating pipe to keep event loop running"); 577 } else { 578 ev_default = event_new(evbase,fake_fds[0], 579 EV_READ|EV_PERSIST, 580 NULL,NULL); 581 event_add(ev_default,NULL); 582 } 583 584 /* Start the event loop. */ 585 (void)event_base_dispatch(evbase); 586 587 // clean-up fake fds/event 588 if (NULL != ev_default) 589 event_free(ev_default); 590 close(fake_fds[0]); 591 close(fake_fds[1]); 592 } 593 594 void tcp_server_worker::on_stop() 595 { 596 event_base_loopbreak(evbase); 597 } 598 599 tcp_server_socket::tcp_server_socket(unsigned short if_num) 600 : trsp_socket(if_num,0), 601 evbase(NULL), ev_accept(NULL) 602 { 603 } 604 605 int tcp_server_socket::bind(const string& bind_ip, unsigned short bind_port) 606 { 607 if(sd){ 608 WARN("re-binding socket\n"); 609 close(sd); 610 } 611 612 if(am_inet_pton(bind_ip.c_str(),&addr) == 0){ 613 614 ERROR("am_inet_pton(%s): %s\n",bind_ip.c_str(),strerror(errno)); 615 return -1; 616 } 617 618 if( ((addr.ss_family == AF_INET) && 619 (SAv4(&addr)->sin_addr.s_addr == INADDR_ANY)) || 620 ((addr.ss_family == AF_INET6) && 621 IN6_IS_ADDR_UNSPECIFIED(&SAv6(&addr)->sin6_addr)) ){ 622 623 ERROR("Sorry, we cannot bind to 'ANY' address\n"); 624 return -1; 625 } 626 627 am_set_port(&addr,bind_port); 628 629 if((sd = socket(addr.ss_family,SOCK_STREAM,0)) == -1){ 630 ERROR("socket: %s\n",strerror(errno)); 631 return -1; 632 } 633 634 int true_opt = 1; 635 if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, 636 (void*)&true_opt, sizeof (true_opt)) == -1) { 637 638 ERROR("%s\n",strerror(errno)); 639 close(sd); 640 return -1; 641 } 642 643 if(ioctl(sd, FIONBIO , &true_opt) == -1) { 644 ERROR("setting non-blocking: %s\n",strerror(errno)); 645 close(sd); 646 return -1; 647 } 648 649 if(::bind(sd,(const struct sockaddr*)&addr,SA_len(&addr)) < 0) { 650 651 ERROR("bind: %s\n",strerror(errno)); 652 close(sd); 653 return -1; 654 } 655 656 if(::listen(sd, 16) < 0) { 657 ERROR("listen: %s\n",strerror(errno)); 658 close(sd); 659 return -1; 660 } 661 662 port = bind_port; 663 ip = bind_ip; 664 665 DBG("TCP transport bound to %s/%i\n",ip.c_str(),port); 666 667 return 0; 668 } 669 670 void tcp_server_socket::on_accept(int fd, short ev, void* arg) 671 { 672 tcp_server_socket* trsp = (tcp_server_socket*)arg; 673 trsp->on_accept(fd,ev); 674 } 675 676 uint32_t tcp_server_socket::hash_addr(const sockaddr_storage* addr) 677 { 678 unsigned int port = am_get_port(addr); 679 uint32_t h=0; 680 if(addr->ss_family == AF_INET) { 681 h = hashlittle(&SAv4(addr)->sin_addr,sizeof(in_addr),port); 682 } 683 else { 684 h = hashlittle(&SAv6(addr)->sin6_addr,sizeof(in6_addr),port); 685 } 686 return h; 687 } 688 689 void tcp_server_socket::add_event(struct event_base *evbase) 690 { 691 this->evbase = evbase; 692 693 if(!ev_accept) { 694 ev_accept = event_new(evbase, sd, EV_READ|EV_PERSIST, 695 tcp_server_socket::on_accept, (void *)this); 696 event_add(ev_accept, NULL); // no timeout 697 } 698 } 699 700 void tcp_server_socket::add_threads(unsigned int n) 701 { 702 for(unsigned int i=0; i<n; i++) { 703 workers.push_back(new tcp_server_worker(this)); 704 } 705 } 706 707 void tcp_server_socket::start_threads() 708 { 709 for(unsigned int i=0; i<workers.size(); i++) { 710 workers[i]->start(); 711 } 712 } 713 714 void tcp_server_socket::stop_threads() 715 { 716 for(unsigned int i=0; i<workers.size(); i++) { 717 workers[i]->stop(); 718 } 719 } 720 721 void tcp_server_socket::on_accept(int sd, short ev) 722 { 723 sockaddr_storage src_addr; 724 socklen_t src_addr_len = sizeof(sockaddr_storage); 725 726 int connection_sd = accept(sd,(sockaddr*)&src_addr,&src_addr_len); 727 if(connection_sd < 0) { 728 WARN("error while accepting connection"); 729 return; 730 } 731 732 int true_opt = 1; 733 if(ioctl(connection_sd, FIONBIO , &true_opt) == -1) { 734 ERROR("could not make new connection non-blocking: %s\n",strerror(errno)); 735 close(connection_sd); 736 return; 737 } 738 739 uint32_t h = hash_addr(&src_addr); 740 unsigned int idx = h % workers.size(); 741 742 // in case of thread pooling, do following in worker thread 743 DBG("tcp_trsp_socket::create_connected (idx = %u)",idx); 744 tcp_trsp_socket::create_connected(this,workers[idx],connection_sd, 745 &src_addr,evbase); 746 } 747 748 int tcp_server_socket::send(const sockaddr_storage* sa, const char* msg, 749 const int msg_len, unsigned int flags) 750 { 751 uint32_t h = hash_addr(sa); 752 unsigned int idx = h % workers.size(); 753 DBG("tcp_server_socket::send: idx = %u",idx); 754 return workers[idx]->send(sa,msg,msg_len,flags); 755 } 756 757 void tcp_server_socket::set_connect_timeout(unsigned int ms) 758 { 759 connect_timeout.tv_sec = ms / 1000; 760 connect_timeout.tv_usec = (ms % 1000) * 1000; 761 } 762 763 void tcp_server_socket::set_idle_timeout(unsigned int ms) 764 { 765 idle_timeout.tv_sec = ms / 1000; 766 idle_timeout.tv_usec = (ms % 1000) * 1000; 767 } 768 769 struct timeval* tcp_server_socket::get_connect_timeout() 770 { 771 if(connect_timeout.tv_sec || connect_timeout.tv_usec) 772 return &connect_timeout; 773 774 return NULL; 775 } 776 777 struct timeval* tcp_server_socket::get_idle_timeout() 778 { 779 if(idle_timeout.tv_sec || idle_timeout.tv_usec) 780 return &idle_timeout; 781 782 return NULL; 783 } 784 785 tcp_trsp::tcp_trsp(tcp_server_socket* sock) 786 : transport(sock) 787 { 788 evbase = event_base_new(); 789 sock->add_event(evbase); 790 } 791 792 tcp_trsp::~tcp_trsp() 793 { 794 if(evbase) { 795 event_base_free(evbase); 796 } 797 } 798 799 /** @see AmThread */ 800 void tcp_trsp::run() 801 { 802 int server_sd = sock->get_sd(); 803 if(server_sd <= 0){ 804 ERROR("Transport instance not bound\n"); 805 return; 806 } 807 808 tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock); 809 tcp_sock->start_threads(); 810 811 INFO("Started SIP server TCP transport on %s:%i\n", 812 sock->get_ip(),sock->get_port()); 813 814 /* Start the event loop. */ 815 int ret = event_base_dispatch(evbase); 816 817 INFO("TCP SIP server on %s:%i finished (%i)", 818 sock->get_ip(),sock->get_port(),ret); 819 } 820 821 /** @see AmThread */ 822 void tcp_trsp::on_stop() 823 { 824 event_base_loopbreak(evbase); 825 tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock); 826 tcp_sock->stop_threads(); 827 } 828 829