1 #include "tcp_trsp.h"
2 #include "ip_util.h"
3 #include "parse_common.h"
4 #include "sip_parser.h"
5 #include "trans_layer.h"
6 #include "hash.h"
7 
8 #include "AmUtils.h"
9 
10 #include <netdb.h>
11 #include <event2/event.h>
12 #include <string.h>
13 #include <fcntl.h>
14 #include <sys/ioctl.h>
15 
16 
17 void tcp_trsp_socket::on_sock_read(int fd, short ev, void* arg)
18 {
19   if(ev & (EV_READ|EV_TIMEOUT)){
20     ((tcp_trsp_socket*)arg)->on_read(ev);
21   }
22 }
23 
24 void tcp_trsp_socket::on_sock_write(int fd, short ev, void* arg)
25 {
26   if(ev & (EV_WRITE|EV_TIMEOUT)){
27     ((tcp_trsp_socket*)arg)->on_write(ev);
28   }
29 }
30 
31 tcp_trsp_socket::tcp_trsp_socket(tcp_server_socket* server_sock,
32 				 tcp_server_worker* server_worker,
33 				 int sd, const sockaddr_storage* sa,
34 				 struct event_base* evbase)
35   : trsp_socket(server_sock->get_if(),0,0,sd),
36     server_sock(server_sock), server_worker(server_worker),
37     closed(false), connected(false),
38     input_len(0), evbase(evbase),
39     read_ev(NULL), write_ev(NULL)
40 {
~sip_ua()41   // local address
42   ip = server_sock->get_ip();
43   port = server_sock->get_port();
44   server_sock->copy_addr_to(&addr);
45 
46   // peer address
47   memcpy(&peer_addr,sa,sizeof(sockaddr_storage));
48 
49   char host[NI_MAXHOST] = "";
50   peer_ip = am_inet_ntop(&peer_addr,host,NI_MAXHOST);
51   peer_port = am_get_port(&peer_addr);
52 
53   // async parser state
54   pst.reset((char*)input_buf);
55 
56   if(sd > 0) {
57     create_events();
58   }
59 }
60 
61 void tcp_trsp_socket::create_connected(tcp_server_socket* server_sock,
62 				       tcp_server_worker* server_worker,
63 				       int sd, const sockaddr_storage* sa,
64 				       struct event_base* evbase)
65 {
66   if(sd < 0)
67     return;
68 
69   tcp_trsp_socket* sock = new tcp_trsp_socket(server_sock,server_worker,
70 					      sd,sa,evbase);
71 
72   inc_ref(sock);
73   server_worker->add_connection(sock);
74 
75   sock->connected = true;
76   sock->add_read_event();
77   dec_ref(sock);
78 }
79 
80 tcp_trsp_socket* tcp_trsp_socket::new_connection(tcp_server_socket* server_sock,
81 						 tcp_server_worker* server_worker,
82 						 const sockaddr_storage* sa,
83 						 struct event_base* evbase)
84 {
85   return new tcp_trsp_socket(server_sock,server_worker,-1,sa,evbase);
86 }
87 
88 
89 tcp_trsp_socket::~tcp_trsp_socket()
90 {
91   DBG("********* connection destructor ***********");
92   event_free(read_ev);
93   event_free(write_ev);
94 }
95 
96 void tcp_trsp_socket::create_events()
97 {
98   read_ev = event_new(evbase, sd, EV_READ|EV_PERSIST,
99 		      tcp_trsp_socket::on_sock_read,
100 		      (void *)this);
101 
102   write_ev = event_new(evbase, sd, EV_WRITE,
103 		       tcp_trsp_socket::on_sock_write,
104 		       (void *)this);
105 }
106 
107 void tcp_trsp_socket::add_read_event_ul()
108 {
109   sock_mut.unlock();
110   add_read_event();
111   sock_mut.lock();
112 }
113 
114 void tcp_trsp_socket::add_read_event()
115 {
116   event_add(read_ev, server_sock->get_idle_timeout());
117 }
118 
119 void tcp_trsp_socket::add_write_event_ul(struct timeval* timeout)
120 {
121   sock_mut.unlock();
122   add_write_event(timeout);
123   sock_mut.lock();
124 }
125 
126 void tcp_trsp_socket::add_write_event(struct timeval* timeout)
127 {
128   event_add(write_ev, timeout);
129 }
130 
131 void tcp_trsp_socket::copy_peer_addr(sockaddr_storage* sa)
132 {
133   memcpy(sa,&peer_addr,sizeof(sockaddr_storage));
134 }
135 
136 tcp_trsp_socket::msg_buf::msg_buf(const sockaddr_storage* sa, const char* msg,
137 				  const int msg_len)
138   : msg_len(msg_len)
139 {
140   memcpy(&addr,sa,sizeof(sockaddr_storage));
141   cursor = this->msg = new char[msg_len];
142   memcpy(this->msg,msg,msg_len);
143 }
144 
145 tcp_trsp_socket::msg_buf::~msg_buf()
146 {
147   delete [] msg;
148 }
149 
150 int tcp_trsp_socket::on_connect(short ev)
151 {
152   DBG("************ on_connect() ***********");
153 
154   if(ev & EV_TIMEOUT) {
155     DBG("********** connection timeout on sd=%i ************\n",sd);
156     close();
157     return -1;
158   }
159 
160   socklen_t len = sizeof(int);
161   int error = 0;
162   if(getsockopt(sd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
163     ERROR("getsockopt: %s",strerror(errno));
164     close();
165     return -1;
166   }
167 
168   if(error != 0) {
169     DBG("*********** connection error (sd=%i): %s *********",
170 	sd,strerror(error));
171     close();
172     return -1;
173   }
174 
175   connected = true;
176   add_read_event();
177 
178   return 0;
179 }
180 
181 int tcp_trsp_socket::connect()
182 {
183   if(sd > 0) {
184     ERROR("pending connection request: close first.");
185     return -1;
186   }
187 
188   if((sd = socket(peer_addr.ss_family,SOCK_STREAM,0)) == -1){
189     ERROR("socket: %s\n",strerror(errno));
190     return -1;
191   }
192 
193   int true_opt = 1;
194   if(ioctl(sd, FIONBIO , &true_opt) == -1) {
195     ERROR("could not make new connection non-blocking: %s\n",strerror(errno));
196     ::close(sd);
197     sd = -1;
198     return -1;
199   }
200 
201   DBG("connecting to %s:%i...",
202       am_inet_ntop(&peer_addr).c_str(),
203       am_get_port(&peer_addr));
204 
205   return ::connect(sd, (const struct sockaddr*)&peer_addr,
206 		   SA_len(&peer_addr));
207 }
208 
209 int tcp_trsp_socket::check_connection()
210 {
211   if(sd < 0){
212     int ret = connect();
213     if(ret < 0) {
214       if(errno != EINPROGRESS && errno != EALREADY) {
215 	ERROR("could not connect: %s",strerror(errno));
216 	::close(sd);
217 	sd = -1;
218 	return -1;
219       }
220     }
221 
222     // it's time to create the events...
223     create_events();
224 
225     if(ret < 0) {
226       add_write_event_ul(server_sock->get_connect_timeout());
227       DBG("connect event added...");
228 
229       // because of unlock in ad_write_event_ul,
230       // on_connect() might already have been scheduled
231       if(closed)
232 	return -1;
233     }
234     else {
235       // connect succeeded immediatly
236       connected = true;
237       add_read_event_ul();
238     }
239   }
240 
241   return 0;
242 }
243 
244 int tcp_trsp_socket::send(const sockaddr_storage* sa, const char* msg,
245 			  const int msg_len, unsigned int flags)
246 {
247   AmLock _l(sock_mut);
248 
249   if(closed || (check_connection() < 0))
250     return -1;
251 
252   send_q.push_back(new msg_buf(sa,msg,msg_len));
253 
254   if(connected) {
255     add_write_event_ul();
256     DBG("write event added...");
257   }
258 
259   return 0;
260 }
261 
262 void tcp_trsp_socket::close()
263 {
264   inc_ref(this);
265   server_worker->remove_connection(this);
266 
267   closed = true;
268   DBG("********* closing connection ***********");
269 
270   event_del(read_ev);
271   event_del(write_ev);
272 
273   if(sd > 0) {
274     ::close(sd);
275     sd = -1;
276   }
277 
278   generate_transport_errors();
279   dec_ref(this);
280 }
281 
282 void tcp_trsp_socket::generate_transport_errors()
283 {
284   while(!send_q.empty()) {
285 
286     msg_buf* msg = send_q.front();
287     send_q.pop_front();
288 
289     sip_msg s_msg(msg->msg,msg->msg_len);
290     delete msg;
291 
292     copy_peer_addr(&s_msg.remote_ip);
293     copy_addr_to(&s_msg.local_ip);
294 
295     trans_layer::instance()->transport_error(&s_msg);
296   }
297 }
298 
299 void tcp_trsp_socket::on_read(short ev)
300 {
301   int bytes = 0;
302   char* old_cursor = (char*)get_input();
303 
304   {// locked section
305 
306     if(ev & EV_TIMEOUT) {
307       DBG("************ idle timeout: closing connection **********");
308       close();
309       return;
310     }
311 
312     AmLock _l(sock_mut);
313     DBG("on_read (connected = %i)",connected);
314 
315     bytes = ::read(sd,get_input(),get_input_free_space());
316     if(bytes < 0) {
317       switch(errno) {
318       case EAGAIN:
319 	return; // nothing to read
320 
321       case ECONNRESET:
322       case ENOTCONN:
323 	DBG("connection has been closed (sd=%i)",sd);
324 	close();
325 	return;
326 
327       case ETIMEDOUT:
328 	DBG("transmission timeout (sd=%i)",sd);
329 	close();
330 	return;
331 
332       default:
333 	DBG("unknown error (%i): %s",errno,strerror(errno));
334 	close();
335 	return;
336       }
337     }
338     else if(bytes == 0) {
339       // connection closed
340       DBG("connection has been closed (sd=%i)",sd);
341       close();
342       return;
343     }
344   }// end of - locked section
345 
346   input_len += bytes;
347 
348   DBG("received: <%.*s>",bytes,old_cursor);
349 
350   // ... and parse it
351   if(parse_input() < 0) {
352     DBG("Error while parsing input: closing connection!");
353     sock_mut.lock();
354     close();
355     sock_mut.unlock();
356   }
357 }
358 
359 int tcp_trsp_socket::parse_input()
360 {
361   for(;;) {
362     int err = skip_sip_msg_async(&pst, (char*)(input_buf+input_len));
363     if(err) {
364 
365       if(err == UNEXPECTED_EOT) {
366 
367 	if(pst.orig_buf > (char*)input_buf) {
368 
369 	  int addr_shift = pst.orig_buf - (char*)input_buf;
370 	  memmove(input_buf, pst.orig_buf, input_len - addr_shift);
371 
372 	  pst.orig_buf = (char*)input_buf;
373 	  pst.c -= addr_shift;
374 	  if(pst.beg)
375 	    pst.beg -= addr_shift;
376 	  input_len -= addr_shift;
377 
378 	  return 0;
379 	}
380 	else if(get_input_free_space()){
381 	  return 0;
382 	}
383 
384 	ERROR("message way too big! drop connection...");
385       }
386       else {
387 	ERROR("parsing error %i",err);
388       }
389 
390       pst.reset((char*)input_buf);
391       reset_input();
392 
393       return -1;
394     }
395 
396     int msg_len = pst.get_msg_len();
397     DBG("received msg:\n%.*s",msg_len,pst.orig_buf);
398 
399     sip_msg* s_msg = new sip_msg((const char*)pst.orig_buf,msg_len);
400 
401     copy_peer_addr(&s_msg->remote_ip);
402     copy_addr_to(&s_msg->local_ip);
403 
404     s_msg->local_socket = this;
405     inc_ref(this);
406 
407     // pass message to the parser / transaction layer
408     trans_layer::instance()->received_msg(s_msg);
409 
410     char* msg_end = pst.orig_buf + msg_len;
411     char* input_end = (char*)input_buf + input_len;
412 
413     if(msg_end < input_end) {
414       pst.reset(msg_end);
415     }
416     else {
417       pst.reset((char*)input_buf);
418       reset_input();
419       return 0;
420     }
421   }
422 
423   // fake:
424   //return 0;
425 }
426 
427 void tcp_trsp_socket::on_write(short ev)
428 {
429   AmLock _l(sock_mut);
430 
431   DBG("on_write (connected = %i)",connected);
432   if(!connected) {
433     if(on_connect(ev) != 0) {
434       return;
435     }
436   }
437 
438   while(!send_q.empty()) {
439 
440     msg_buf* msg = send_q.front();
441     if(!msg || !msg->bytes_left()) {
442       send_q.pop_front();
443       delete msg;
444       continue;
445     }
446 
447     // send msg
448     int bytes = write(sd,msg->cursor,msg->bytes_left());
449     if(bytes < 0) {
450       DBG("error on write: %i",bytes);
451       switch(errno){
452       case EINTR:
453       case EAGAIN: // would block
454 	add_write_event();
455 	break;
456 
457       default: // unforseen error: close connection
458 	ERROR("unforseen error: close connection (%i/%s)",
459 	      errno,strerror(errno));
460 	close();
461 	break;
462       }
463       return;
464     }
465 
466     DBG("bytes written: <%.*s>",bytes,msg->cursor);
467 
468     if(bytes < msg->bytes_left()) {
469       msg->cursor += bytes;
470       add_write_event();
471       return;
472     }
473 
474     send_q.pop_front();
475     delete msg;
476   }
477 }
478 
479 tcp_server_worker::tcp_server_worker(tcp_server_socket* server_sock)
480   : server_sock(server_sock)
481 {
482   evbase = event_base_new();
483 }
484 
485 tcp_server_worker::~tcp_server_worker()
486 {
487   event_base_free(evbase);
488 }
489 
490 void tcp_server_worker::add_connection(tcp_trsp_socket* client_sock)
491 {
492   string conn_id = client_sock->get_peer_ip()
493     + ":" + int2str(client_sock->get_peer_port());
494 
495   DBG("new TCP connection from %s:%u",
496       client_sock->get_peer_ip().c_str(),
497       client_sock->get_peer_port());
498 
499   connections_mut.lock();
500   map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
501   if(sock_it != connections.end()) {
502     dec_ref(sock_it->second);
503     sock_it->second = client_sock;
504   }
505   else {
506     connections[conn_id] = client_sock;
507   }
508   inc_ref(client_sock);
509   connections_mut.unlock();
510 }
511 
512 void tcp_server_worker::remove_connection(tcp_trsp_socket* client_sock)
513 {
514   string conn_id = client_sock->get_peer_ip()
515     + ":" + int2str(client_sock->get_peer_port());
516 
517   DBG("removing TCP connection from %s",conn_id.c_str());
518 
519   connections_mut.lock();
520   map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(conn_id);
521   if(sock_it != connections.end()) {
522     dec_ref(sock_it->second);
523     connections.erase(sock_it);
524     DBG("TCP connection from %s removed",conn_id.c_str());
525   }
526   connections_mut.unlock();
527 }
528 
529 int tcp_server_worker::send(const sockaddr_storage* sa, const char* msg,
530 			    const int msg_len, unsigned int flags)
531 {
532   char host_buf[NI_MAXHOST];
533   string dest = am_inet_ntop(sa,host_buf,NI_MAXHOST);
534   dest += ":" + int2str(am_get_port(sa));
535 
536   tcp_trsp_socket* sock = NULL;
537 
538   bool new_conn=false;
539   connections_mut.lock();
540   map<string,tcp_trsp_socket*>::iterator sock_it = connections.find(dest);
541   if(sock_it != connections.end()) {
542     sock = sock_it->second;
543     inc_ref(sock);
544   }
545   else {
546     //TODO: add flags to avoid new connections (ex: UAs behind NAT)
547     tcp_trsp_socket* new_sock = tcp_trsp_socket::new_connection(server_sock,this,
548 								sa,evbase);
549     connections[dest] = new_sock;
550     inc_ref(new_sock);
551 
552     sock = new_sock;
553     inc_ref(sock);
554     new_conn = true;
555   }
556   connections_mut.unlock();
557 
558   // must be done outside from connections_mut
559   // to avoid dead-lock with the event base
560   int ret = sock->send(sa,msg,msg_len,flags);
561   if((ret < 0) && new_conn) {
562     remove_connection(sock);
563   }
564   dec_ref(sock);
565 
566   return ret;
567 }
568 
569 void tcp_server_worker::run()
570 {
571   // fake event to prevent the event loop from exiting
572   int fake_fds[2];
573   struct event* ev_default = NULL;
574   int res = pipe(fake_fds);
575   if (res<0) {
576     ERROR("creating pipe to keep event loop running");
577   } else {
578     ev_default = event_new(evbase,fake_fds[0],
579 			   EV_READ|EV_PERSIST,
580 			   NULL,NULL);
581     event_add(ev_default,NULL);
582   }
583 
584   /* Start the event loop. */
585   (void)event_base_dispatch(evbase);
586 
587   // clean-up fake fds/event
588   if (NULL != ev_default)
589     event_free(ev_default);
590   close(fake_fds[0]);
591   close(fake_fds[1]);
592 }
593 
594 void tcp_server_worker::on_stop()
595 {
596   event_base_loopbreak(evbase);
597 }
598 
599 tcp_server_socket::tcp_server_socket(unsigned short if_num)
600   : trsp_socket(if_num,0),
601     evbase(NULL), ev_accept(NULL)
602 {
603 }
604 
605 int tcp_server_socket::bind(const string& bind_ip, unsigned short bind_port)
606 {
607   if(sd){
608     WARN("re-binding socket\n");
609     close(sd);
610   }
611 
612   if(am_inet_pton(bind_ip.c_str(),&addr) == 0){
613 
614     ERROR("am_inet_pton(%s): %s\n",bind_ip.c_str(),strerror(errno));
615     return -1;
616   }
617 
618   if( ((addr.ss_family == AF_INET) &&
619        (SAv4(&addr)->sin_addr.s_addr == INADDR_ANY)) ||
620       ((addr.ss_family == AF_INET6) &&
621        IN6_IS_ADDR_UNSPECIFIED(&SAv6(&addr)->sin6_addr)) ){
622 
623     ERROR("Sorry, we cannot bind to 'ANY' address\n");
624     return -1;
625   }
626 
627   am_set_port(&addr,bind_port);
628 
629   if((sd = socket(addr.ss_family,SOCK_STREAM,0)) == -1){
630     ERROR("socket: %s\n",strerror(errno));
631     return -1;
632   }
633 
634   int true_opt = 1;
635   if(setsockopt(sd, SOL_SOCKET, SO_REUSEADDR,
636 		(void*)&true_opt, sizeof (true_opt)) == -1) {
637 
638     ERROR("%s\n",strerror(errno));
639     close(sd);
640     return -1;
641   }
642 
643   if(ioctl(sd, FIONBIO , &true_opt) == -1) {
644     ERROR("setting non-blocking: %s\n",strerror(errno));
645     close(sd);
646     return -1;
647   }
648 
649   if(::bind(sd,(const struct sockaddr*)&addr,SA_len(&addr)) < 0) {
650 
651     ERROR("bind: %s\n",strerror(errno));
652     close(sd);
653     return -1;
654   }
655 
656   if(::listen(sd, 16) < 0) {
657     ERROR("listen: %s\n",strerror(errno));
658     close(sd);
659     return -1;
660   }
661 
662   port = bind_port;
663   ip   = bind_ip;
664 
665   DBG("TCP transport bound to %s/%i\n",ip.c_str(),port);
666 
667   return 0;
668 }
669 
670 void tcp_server_socket::on_accept(int fd, short ev, void* arg)
671 {
672   tcp_server_socket* trsp = (tcp_server_socket*)arg;
673   trsp->on_accept(fd,ev);
674 }
675 
676 uint32_t tcp_server_socket::hash_addr(const sockaddr_storage* addr)
677 {
678   unsigned int port = am_get_port(addr);
679   uint32_t h=0;
680   if(addr->ss_family == AF_INET) {
681     h = hashlittle(&SAv4(addr)->sin_addr,sizeof(in_addr),port);
682   }
683   else {
684     h = hashlittle(&SAv6(addr)->sin6_addr,sizeof(in6_addr),port);
685   }
686   return h;
687 }
688 
689 void tcp_server_socket::add_event(struct event_base *evbase)
690 {
691   this->evbase = evbase;
692 
693   if(!ev_accept) {
694     ev_accept = event_new(evbase, sd, EV_READ|EV_PERSIST,
695 			  tcp_server_socket::on_accept, (void *)this);
696     event_add(ev_accept, NULL); // no timeout
697   }
698 }
699 
700 void tcp_server_socket::add_threads(unsigned int n)
701 {
702   for(unsigned int i=0; i<n; i++) {
703     workers.push_back(new tcp_server_worker(this));
704   }
705 }
706 
707 void tcp_server_socket::start_threads()
708 {
709   for(unsigned int i=0; i<workers.size(); i++) {
710     workers[i]->start();
711   }
712 }
713 
714 void tcp_server_socket::stop_threads()
715 {
716   for(unsigned int i=0; i<workers.size(); i++) {
717     workers[i]->stop();
718   }
719 }
720 
721 void tcp_server_socket::on_accept(int sd, short ev)
722 {
723   sockaddr_storage src_addr;
724   socklen_t        src_addr_len = sizeof(sockaddr_storage);
725 
726   int connection_sd = accept(sd,(sockaddr*)&src_addr,&src_addr_len);
727   if(connection_sd < 0) {
728     WARN("error while accepting connection");
729     return;
730   }
731 
732   int true_opt = 1;
733   if(ioctl(connection_sd, FIONBIO , &true_opt) == -1) {
734     ERROR("could not make new connection non-blocking: %s\n",strerror(errno));
735     close(connection_sd);
736     return;
737   }
738 
739   uint32_t h = hash_addr(&src_addr);
740   unsigned int idx = h % workers.size();
741 
742   // in case of thread pooling, do following in worker thread
743   DBG("tcp_trsp_socket::create_connected (idx = %u)",idx);
744   tcp_trsp_socket::create_connected(this,workers[idx],connection_sd,
745 				    &src_addr,evbase);
746 }
747 
748 int tcp_server_socket::send(const sockaddr_storage* sa, const char* msg,
749 			    const int msg_len, unsigned int flags)
750 {
751   uint32_t h = hash_addr(sa);
752   unsigned int idx = h % workers.size();
753   DBG("tcp_server_socket::send: idx = %u",idx);
754   return workers[idx]->send(sa,msg,msg_len,flags);
755 }
756 
757 void tcp_server_socket::set_connect_timeout(unsigned int ms)
758 {
759   connect_timeout.tv_sec = ms / 1000;
760   connect_timeout.tv_usec = (ms % 1000) * 1000;
761 }
762 
763 void tcp_server_socket::set_idle_timeout(unsigned int ms)
764 {
765   idle_timeout.tv_sec = ms / 1000;
766   idle_timeout.tv_usec = (ms % 1000) * 1000;
767 }
768 
769 struct timeval* tcp_server_socket::get_connect_timeout()
770 {
771   if(connect_timeout.tv_sec || connect_timeout.tv_usec)
772     return &connect_timeout;
773 
774   return NULL;
775 }
776 
777 struct timeval* tcp_server_socket::get_idle_timeout()
778 {
779   if(idle_timeout.tv_sec || idle_timeout.tv_usec)
780     return &idle_timeout;
781 
782   return NULL;
783 }
784 
785 tcp_trsp::tcp_trsp(tcp_server_socket* sock)
786     : transport(sock)
787 {
788   evbase = event_base_new();
789   sock->add_event(evbase);
790 }
791 
792 tcp_trsp::~tcp_trsp()
793 {
794   if(evbase) {
795     event_base_free(evbase);
796   }
797 }
798 
799 /** @see AmThread */
800 void tcp_trsp::run()
801 {
802   int server_sd = sock->get_sd();
803   if(server_sd <= 0){
804     ERROR("Transport instance not bound\n");
805     return;
806   }
807 
808   tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock);
809   tcp_sock->start_threads();
810 
811   INFO("Started SIP server TCP transport on %s:%i\n",
812        sock->get_ip(),sock->get_port());
813 
814   /* Start the event loop. */
815   int ret = event_base_dispatch(evbase);
816 
817   INFO("TCP SIP server on %s:%i finished (%i)",
818        sock->get_ip(),sock->get_port(),ret);
819 }
820 
821 /** @see AmThread */
822 void tcp_trsp::on_stop()
823 {
824   event_base_loopbreak(evbase);
825   tcp_server_socket* tcp_sock = static_cast<tcp_server_socket*>(sock);
826   tcp_sock->stop_threads();
827 }
828 
829