1 /* $Id$ */
2 /*
3 * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4 * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20 #include <pjsip/sip_transport_udp.h>
21 #include <pjsip/sip_endpoint.h>
22 #include <pjsip/sip_errno.h>
23 #include <pj/addr_resolv.h>
24 #include <pj/assert.h>
25 #include <pj/lock.h>
26 #include <pj/log.h>
27 #include <pj/os.h>
28 #include <pj/pool.h>
29 #include <pj/sock.h>
30 #include <pj/compat/socket.h>
31 #include <pj/string.h>
32
33
34 #define THIS_FILE "sip_transport_udp.c"
35
36 /**
37 * These are the target values for socket send and receive buffer sizes,
38 * respectively. They will be applied to UDP socket with setsockopt().
39 * When transport failed to set these size, it will decrease it until
40 * sufficiently large number has been successfully set.
41 *
42 * The buffer size is important, especially in WinXP/2000 machines.
43 * Basicly the lower the size, the more packets will be lost (dropped?)
44 * when we're sending (receiving?) packets in large volumes.
45 *
46 * The figure here is taken based on my experiment on WinXP/2000 machine,
47 * and with this value, the rate of dropped packet is about 8% when
48 * sending 1800 requests simultaneously (percentage taken as average
49 * after 50K requests or so).
50 *
51 * More experiments are needed probably.
52 */
53 /* 2010/01/14
54 * Too many people complained about seeing "Error setting SNDBUF" log,
55 * so lets just remove this. People who want to have SNDBUF set can
56 * still do so by declaring these two macros in config_site.h
57 */
58 #ifndef PJSIP_UDP_SO_SNDBUF_SIZE
59 /*# define PJSIP_UDP_SO_SNDBUF_SIZE (24*1024*1024)*/
60 # define PJSIP_UDP_SO_SNDBUF_SIZE 0
61 #endif
62
63 #ifndef PJSIP_UDP_SO_RCVBUF_SIZE
64 /*# define PJSIP_UDP_SO_RCVBUF_SIZE (24*1024*1024)*/
65 # define PJSIP_UDP_SO_RCVBUF_SIZE 0
66 #endif
67
68
69 /* Struct udp_transport "inherits" struct pjsip_transport */
70 struct udp_transport
71 {
72 pjsip_transport base;
73 pj_sock_t sock;
74 pj_ioqueue_key_t *key;
75 int rdata_cnt;
76 pjsip_rx_data **rdata;
77 int is_closing;
78 pj_bool_t is_paused;
79 int read_loop_spin;
80
81 /* Group lock to be used by UDP transport and ioqueue key */
82 pj_grp_lock_t *grp_lock;
83 };
84
85
86 /*
87 * Initialize transport's receive buffer from the specified pool.
88 */
init_rdata(struct udp_transport * tp,unsigned rdata_index,pj_pool_t * pool,pjsip_rx_data ** p_rdata)89 static void init_rdata(struct udp_transport *tp, unsigned rdata_index,
90 pj_pool_t *pool, pjsip_rx_data **p_rdata)
91 {
92 pjsip_rx_data *rdata;
93
94 /* Reset pool. */
95 //note: already done by caller
96 //pj_pool_reset(pool);
97
98 rdata = PJ_POOL_ZALLOC_T(pool, pjsip_rx_data);
99
100 /* Init tp_info part. */
101 rdata->tp_info.pool = pool;
102 rdata->tp_info.transport = &tp->base;
103 rdata->tp_info.tp_data = (void*)(pj_ssize_t)rdata_index;
104 rdata->tp_info.op_key.rdata = rdata;
105 pj_ioqueue_op_key_init(&rdata->tp_info.op_key.op_key,
106 sizeof(pj_ioqueue_op_key_t));
107
108 tp->rdata[rdata_index] = rdata;
109
110 if (p_rdata)
111 *p_rdata = rdata;
112 }
113
114
115 /*
116 * udp_on_read_complete()
117 *
118 * This is callback notification from ioqueue that a pending recvfrom()
119 * operation has completed.
120 */
udp_on_read_complete(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_read)121 static void udp_on_read_complete( pj_ioqueue_key_t *key,
122 pj_ioqueue_op_key_t *op_key,
123 pj_ssize_t bytes_read)
124 {
125 /* See https://trac.pjsip.org/repos/ticket/1197 */
126 enum { MAX_IMMEDIATE_PACKET = 50 };
127 pjsip_rx_data_op_key *rdata_op_key = (pjsip_rx_data_op_key*) op_key;
128 pjsip_rx_data *rdata = rdata_op_key->rdata;
129 struct udp_transport *tp = (struct udp_transport*)rdata->tp_info.transport;
130 int i;
131 pj_status_t status;
132
133 ++tp->read_loop_spin;
134
135 /* Don't do anything if transport is closing. */
136 if (tp->is_closing) {
137 tp->is_closing++;
138 goto on_return;
139 }
140
141 /* Don't do anything if transport is being paused. */
142 if (tp->is_paused)
143 goto on_return;
144
145 if (-bytes_read == PJ_ESOCKETSTOP) {
146 --tp->read_loop_spin;
147 /* Try to recover by restarting the transport. */
148 PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport"));
149 status = pjsip_udp_transport_restart2(
150 &tp->base,
151 PJSIP_UDP_TRANSPORT_DESTROY_SOCKET,
152 PJ_INVALID_SOCKET,
153 &tp->base.local_addr,
154 &tp->base.local_name);
155
156 if (status != PJ_SUCCESS) {
157 PJ_PERROR(1,(THIS_FILE, status,
158 "Error restarting SIP UDP transport"));
159 }
160 return;
161 }
162
163 /*
164 * The idea of the loop is to process immediate data received by
165 * pj_ioqueue_recvfrom(), as long as i < MAX_IMMEDIATE_PACKET. When
166 * i is >= MAX_IMMEDIATE_PACKET, we force the recvfrom() operation to
167 * complete asynchronously, to allow other sockets to get their data.
168 */
169 for (i=0;; ++i) {
170 enum { MIN_SIZE = 32 };
171 pj_uint32_t flags;
172
173 /* Report the packet to transport manager. Only do so if packet size
174 * is relatively big enough for a SIP packet.
175 */
176 if (bytes_read > MIN_SIZE) {
177 pj_ssize_t size_eaten;
178 const pj_sockaddr *src_addr = &rdata->pkt_info.src_addr;
179
180 /* Init pkt_info part. */
181 rdata->pkt_info.len = bytes_read;
182 rdata->pkt_info.zero = 0;
183 pj_gettimeofday(&rdata->pkt_info.timestamp);
184 pj_sockaddr_print(src_addr, rdata->pkt_info.src_name,
185 sizeof(rdata->pkt_info.src_name), 0);
186 rdata->pkt_info.src_port = pj_sockaddr_get_port(src_addr);
187
188 size_eaten =
189 pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr,
190 rdata);
191
192 if (size_eaten < 0) {
193 pj_assert(!"It shouldn't happen!");
194 size_eaten = rdata->pkt_info.len;
195 }
196
197 /* Since this is UDP, the whole buffer is the message. */
198 rdata->pkt_info.len = 0;
199
200 } else if (bytes_read <= MIN_SIZE) {
201
202 /* TODO: */
203
204 } else if (-bytes_read != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
205 -bytes_read != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
206 -bytes_read != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
207 {
208
209 /* Report error to endpoint. */
210 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
211 rdata->tp_info.transport->obj_name,
212 (pj_status_t)-bytes_read,
213 "Warning: pj_ioqueue_recvfrom()"
214 " callback error"));
215 }
216
217 if (i >= MAX_IMMEDIATE_PACKET) {
218 /* Force ioqueue_recvfrom() to return PJ_EPENDING */
219 flags = PJ_IOQUEUE_ALWAYS_ASYNC;
220 } else {
221 flags = 0;
222 }
223
224 /* Reset pool.
225 * Need to copy rdata fields to temp variable because they will
226 * be invalid after pj_pool_reset().
227 */
228 {
229 pj_pool_t *rdata_pool = rdata->tp_info.pool;
230 struct udp_transport *rdata_tp ;
231 unsigned rdata_index;
232
233 rdata_tp = (struct udp_transport*)rdata->tp_info.transport;
234 rdata_index = (unsigned)(unsigned long)(pj_ssize_t)
235 rdata->tp_info.tp_data;
236
237 pj_pool_reset(rdata_pool);
238 init_rdata(rdata_tp, rdata_index, rdata_pool, &rdata);
239
240 /* Change some vars to point to new location after
241 * pool reset.
242 */
243 op_key = &rdata->tp_info.op_key.op_key;
244 }
245
246 /* Only read next packet if transport is not being paused. This
247 * check handles the case where transport is paused while endpoint
248 * is still processing a SIP message.
249 */
250 if (tp->is_paused)
251 break;
252
253 /* Read next packet. */
254 bytes_read = sizeof(rdata->pkt_info.packet);
255 rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
256 status = pj_ioqueue_recvfrom(key, op_key,
257 rdata->pkt_info.packet,
258 &bytes_read, flags,
259 &rdata->pkt_info.src_addr,
260 &rdata->pkt_info.src_addr_len);
261
262 if (status == PJ_SUCCESS) {
263 /* Continue loop. */
264 pj_assert(i < MAX_IMMEDIATE_PACKET);
265
266 } else if (status == PJ_EPENDING) {
267 break;
268
269 } else if (status == PJ_ECANCELLED) {
270 /* Socket is closing, quit loop */
271 break;
272
273 } else {
274
275 if (i < MAX_IMMEDIATE_PACKET) {
276
277 /* Report error to endpoint if this is not EWOULDBLOCK error.*/
278 if (status != PJ_STATUS_FROM_OS(OSERR_EWOULDBLOCK) &&
279 status != PJ_STATUS_FROM_OS(OSERR_EINPROGRESS) &&
280 status != PJ_STATUS_FROM_OS(OSERR_ECONNRESET))
281 {
282 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
283 rdata->tp_info.transport->obj_name,
284 status,
285 "Warning: pj_ioqueue_recvfrom"));
286 }
287
288 /* Continue loop. */
289 bytes_read = 0;
290 } else {
291 /* This is fatal error.
292 * Ioqueue operation will stop for this transport!
293 */
294 PJSIP_ENDPT_LOG_ERROR((rdata->tp_info.transport->endpt,
295 rdata->tp_info.transport->obj_name,
296 status,
297 "FATAL: pj_ioqueue_recvfrom() error, "
298 "UDP transport stopping! Error"));
299 break;
300 }
301 }
302 }
303
304 on_return:
305 --tp->read_loop_spin;
306 }
307
308 /*
309 * udp_on_write_complete()
310 *
311 * This is callback notification from ioqueue that a pending sendto()
312 * operation has completed.
313 */
udp_on_write_complete(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_sent)314 static void udp_on_write_complete( pj_ioqueue_key_t *key,
315 pj_ioqueue_op_key_t *op_key,
316 pj_ssize_t bytes_sent)
317 {
318 struct udp_transport *tp = (struct udp_transport*)
319 pj_ioqueue_get_user_data(key);
320 pjsip_tx_data_op_key *tdata_op_key = (pjsip_tx_data_op_key*)op_key;
321
322 tdata_op_key->tdata = NULL;
323
324 if (-bytes_sent == PJ_ESOCKETSTOP) {
325 pj_status_t status;
326 /* Try to recover by restarting the transport. */
327 PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport"));
328 status = pjsip_udp_transport_restart2(
329 &tp->base,
330 PJSIP_UDP_TRANSPORT_DESTROY_SOCKET,
331 PJ_INVALID_SOCKET,
332 &tp->base.local_addr,
333 &tp->base.local_name);
334
335 if (status != PJ_SUCCESS) {
336 PJ_PERROR(1,(THIS_FILE, status,
337 "Error restarting SIP UDP transport"));
338 }
339 return;
340 }
341
342 if (tdata_op_key->callback) {
343 tdata_op_key->callback(&tp->base, tdata_op_key->token, bytes_sent);
344 }
345 }
346
347 /*
348 * udp_send_msg()
349 *
350 * This function is called by transport manager (by transport->send_msg())
351 * to send outgoing message.
352 */
udp_send_msg(pjsip_transport * transport,pjsip_tx_data * tdata,const pj_sockaddr_t * rem_addr,int addr_len,void * token,pjsip_transport_callback callback)353 static pj_status_t udp_send_msg( pjsip_transport *transport,
354 pjsip_tx_data *tdata,
355 const pj_sockaddr_t *rem_addr,
356 int addr_len,
357 void *token,
358 pjsip_transport_callback callback)
359 {
360 struct udp_transport *tp = (struct udp_transport*)transport;
361 pj_ssize_t size;
362 pj_status_t status;
363
364 PJ_ASSERT_RETURN(transport && tdata, PJ_EINVAL);
365 PJ_ASSERT_RETURN(tdata->op_key.tdata == NULL, PJSIP_EPENDINGTX);
366
367 /* Return error if transport is paused */
368 if (tp->is_paused)
369 return PJSIP_ETPNOTAVAIL;
370
371 /* Init op key. */
372 tdata->op_key.tdata = tdata;
373 tdata->op_key.token = token;
374 tdata->op_key.callback = callback;
375
376 /* Send to ioqueue! */
377 size = tdata->buf.cur - tdata->buf.start;
378 status = pj_ioqueue_sendto(tp->key, (pj_ioqueue_op_key_t*)&tdata->op_key,
379 tdata->buf.start, &size, 0,
380 rem_addr, addr_len);
381
382 if (status != PJ_EPENDING) {
383 if (status == PJ_ESOCKETSTOP) {
384 /* Try to recover by restarting the transport. */
385 PJ_LOG(4,(tp->base.obj_name, "Restarting SIP UDP transport"));
386 status = pjsip_udp_transport_restart2(
387 &tp->base,
388 PJSIP_UDP_TRANSPORT_DESTROY_SOCKET,
389 PJ_INVALID_SOCKET,
390 &tp->base.local_addr,
391 &tp->base.local_name);
392
393 if (status != PJ_SUCCESS) {
394 PJ_PERROR(1,(THIS_FILE, status,
395 "Error restarting SIP UDP transport"));
396 }
397 }
398 tdata->op_key.tdata = NULL;
399 }
400
401 return status;
402 }
403
404
405 /* Clean up UDP resources */
udp_on_destroy(void * arg)406 static void udp_on_destroy(void *arg)
407 {
408 struct udp_transport *tp = (struct udp_transport*)arg;
409 int i;
410
411 /* Destroy rdata */
412 for (i=0; i<tp->rdata_cnt; ++i) {
413 pj_pool_release(tp->rdata[i]->tp_info.pool);
414 }
415
416 /* Destroy reference counter. */
417 if (tp->base.ref_cnt)
418 pj_atomic_destroy(tp->base.ref_cnt);
419
420 /* Destroy lock */
421 if (tp->base.lock)
422 pj_lock_destroy(tp->base.lock);
423
424 PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport destroyed"));
425
426 /* Destroy pool. */
427 pjsip_endpt_release_pool(tp->base.endpt, tp->base.pool);
428 }
429
430
431 /*
432 * udp_destroy()
433 *
434 * This function is called by transport manager (by transport->destroy()).
435 */
udp_destroy(pjsip_transport * transport)436 static pj_status_t udp_destroy( pjsip_transport *transport )
437 {
438 struct udp_transport *tp = (struct udp_transport*)transport;
439 int i;
440
441 /* Mark this transport as closing. */
442 tp->is_closing = 1;
443
444 /* Cancel all pending operations. */
445 /* blp: NO NO NO...
446 * No need to post queued completion as we poll the ioqueue until
447 * we've got events anyway. Posting completion will only cause
448 * callback to be called twice with IOCP: one for the post completion
449 * and another one for closing the socket.
450 *
451 for (i=0; i<tp->rdata_cnt; ++i) {
452 pj_ioqueue_post_completion(tp->key,
453 &tp->rdata[i]->tp_info.op_key.op_key, -1);
454 }
455 */
456
457 /* Unregister from ioqueue. */
458 if (tp->key) {
459 pj_ioqueue_unregister(tp->key);
460 tp->key = NULL;
461 } else {
462 /* Close socket. */
463 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
464 pj_sock_close(tp->sock);
465 tp->sock = PJ_INVALID_SOCKET;
466 }
467 }
468
469 /* Must poll ioqueue because IOCP calls the callback when socket
470 * is closed. We poll the ioqueue until all pending callbacks
471 * have been called.
472 */
473 for (i=0; i<50 && tp->is_closing < 1+tp->rdata_cnt; ++i) {
474 int cnt;
475 pj_time_val timeout = {0, 1};
476
477 cnt = pj_ioqueue_poll(pjsip_endpt_get_ioqueue(transport->endpt),
478 &timeout);
479 if (cnt == 0)
480 break;
481 }
482
483 /* When creating this transport, reference count was incremented to flag
484 * this transport as permanent so it will not be destroyed by transport
485 * manager whenever idle. Application may or may not have cleared the
486 * flag (by calling pjsip_transport_dec_ref()), so in case it has not,
487 * let's do it now, so this transport can be destroyed.
488 */
489 if (pj_atomic_get(tp->base.ref_cnt) > 0)
490 pjsip_transport_dec_ref(&tp->base);
491
492 /* Destroy transport */
493 if (tp->grp_lock) {
494 pj_grp_lock_t *grp_lock = tp->grp_lock;
495 tp->grp_lock = NULL;
496 pj_grp_lock_dec_ref(grp_lock);
497 /* Transport may have been deleted at this point */
498 } else {
499 udp_on_destroy(tp);
500 }
501
502 return PJ_SUCCESS;
503 }
504
505
506 /*
507 * udp_shutdown()
508 *
509 * Start graceful UDP shutdown.
510 */
udp_shutdown(pjsip_transport * transport)511 static pj_status_t udp_shutdown(pjsip_transport *transport)
512 {
513 return pjsip_transport_dec_ref(transport);
514 }
515
516
517 /* Create socket */
create_socket(int af,const pj_sockaddr_t * local_a,int addr_len,pj_sock_t * p_sock)518 static pj_status_t create_socket(int af, const pj_sockaddr_t *local_a,
519 int addr_len, pj_sock_t *p_sock)
520 {
521 pj_sock_t sock;
522 pj_sockaddr_in tmp_addr;
523 pj_sockaddr_in6 tmp_addr6;
524 pj_status_t status;
525
526 status = pj_sock_socket(af, pj_SOCK_DGRAM(), 0, &sock);
527 if (status != PJ_SUCCESS)
528 return status;
529
530 if (local_a == NULL) {
531 if (af == pj_AF_INET6()) {
532 pj_bzero(&tmp_addr6, sizeof(tmp_addr6));
533 tmp_addr6.sin6_family = (pj_uint16_t)af;
534 local_a = &tmp_addr6;
535 addr_len = sizeof(tmp_addr6);
536 } else {
537 pj_sockaddr_in_init(&tmp_addr, NULL, 0);
538 local_a = &tmp_addr;
539 addr_len = sizeof(tmp_addr);
540 }
541 }
542
543 status = pj_sock_bind(sock, local_a, addr_len);
544 if (status != PJ_SUCCESS) {
545 pj_sock_close(sock);
546 return status;
547 }
548
549 *p_sock = sock;
550 return PJ_SUCCESS;
551 }
552
553
554 /* Generate transport's published address */
get_published_name(pj_sock_t sock,char hostbuf[],int hostbufsz,pjsip_host_port * bound_name)555 static pj_status_t get_published_name(pj_sock_t sock,
556 char hostbuf[],
557 int hostbufsz,
558 pjsip_host_port *bound_name)
559 {
560 pj_sockaddr tmp_addr;
561 int addr_len;
562 pj_status_t status;
563
564 addr_len = sizeof(tmp_addr);
565 status = pj_sock_getsockname(sock, &tmp_addr, &addr_len);
566 if (status != PJ_SUCCESS)
567 return status;
568
569 bound_name->host.ptr = hostbuf;
570 if (tmp_addr.addr.sa_family == pj_AF_INET()) {
571 bound_name->port = pj_ntohs(tmp_addr.ipv4.sin_port);
572
573 /* If bound address specifies "0.0.0.0", get the IP address
574 * of local hostname.
575 */
576 if (tmp_addr.ipv4.sin_addr.s_addr == PJ_INADDR_ANY) {
577 pj_sockaddr hostip;
578
579 status = pj_gethostip(pj_AF_INET(), &hostip);
580 if (status != PJ_SUCCESS)
581 return status;
582
583 status = pj_inet_ntop(pj_AF_INET(), &hostip.ipv4.sin_addr,
584 hostbuf, hostbufsz);
585 } else {
586 /* Otherwise use bound address. */
587 status = pj_inet_ntop(pj_AF_INET(), &tmp_addr.ipv4.sin_addr,
588 hostbuf, hostbufsz);
589 }
590
591 } else {
592 /* If bound address specifies "INADDR_ANY" (IPv6), get the
593 * IP address of local hostname
594 */
595 pj_uint32_t loop6[4] = { 0, 0, 0, 0};
596
597 bound_name->port = pj_ntohs(tmp_addr.ipv6.sin6_port);
598
599 if (pj_memcmp(&tmp_addr.ipv6.sin6_addr, loop6, sizeof(loop6))==0) {
600 status = pj_gethostip(tmp_addr.addr.sa_family, &tmp_addr);
601 if (status != PJ_SUCCESS)
602 return status;
603 }
604
605 status = pj_inet_ntop(tmp_addr.addr.sa_family,
606 pj_sockaddr_get_addr(&tmp_addr),
607 hostbuf, hostbufsz);
608 }
609 if (status == PJ_SUCCESS) {
610 bound_name->host.slen = pj_ansi_strlen(hostbuf);
611 }
612
613
614 return status;
615 }
616
617 /* Set the published address of the transport */
udp_set_pub_name(struct udp_transport * tp,const pjsip_host_port * a_name)618 static void udp_set_pub_name(struct udp_transport *tp,
619 const pjsip_host_port *a_name)
620 {
621 enum { INFO_LEN = 80 };
622 char local_addr[PJ_INET6_ADDRSTRLEN+10];
623 char pub_addr[PJ_INET6_ADDRSTRLEN+10];
624 int len;
625
626 pj_assert(a_name->host.slen != 0);
627
628 if (pj_strcmp(&tp->base.local_name.host, &a_name->host) == 0 &&
629 tp->base.local_name.port == a_name->port)
630 {
631 return;
632 }
633
634 pj_strdup_with_null(tp->base.pool, &tp->base.local_name.host,
635 &a_name->host);
636 tp->base.local_name.port = a_name->port;
637
638 /* Update transport info. */
639 if (tp->base.info == NULL) {
640 tp->base.info = (char*) pj_pool_alloc(tp->base.pool, INFO_LEN);
641 }
642
643 pj_sockaddr_print(&tp->base.local_addr, local_addr, sizeof(local_addr), 3);
644 pj_addr_str_print(&tp->base.local_name.host,
645 tp->base.local_name.port,
646 pub_addr, sizeof(pub_addr), 1),
647
648 len = pj_ansi_snprintf( tp->base.info, INFO_LEN, "udp %s [published as %s]",
649 local_addr, pub_addr);
650 PJ_CHECK_TRUNC_STR(len, tp->base.info, INFO_LEN);
651 }
652
653 /* Set the socket handle of the transport */
udp_set_socket(struct udp_transport * tp,pj_sock_t sock,const pjsip_host_port * a_name)654 static void udp_set_socket(struct udp_transport *tp,
655 pj_sock_t sock,
656 const pjsip_host_port *a_name)
657 {
658 #if PJSIP_UDP_SO_RCVBUF_SIZE || PJSIP_UDP_SO_SNDBUF_SIZE
659 long sobuf_size;
660 pj_status_t status;
661 #endif
662
663 /* Adjust socket rcvbuf size */
664 #if PJSIP_UDP_SO_RCVBUF_SIZE
665 sobuf_size = PJSIP_UDP_SO_RCVBUF_SIZE;
666 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_RCVBUF(),
667 &sobuf_size, sizeof(sobuf_size));
668 if (status != PJ_SUCCESS) {
669 PJ_PERROR(4,(THIS_FILE, status, "Error setting SO_RCVBUF"));
670 }
671 #endif
672
673 /* Adjust socket sndbuf size */
674 #if PJSIP_UDP_SO_SNDBUF_SIZE
675 sobuf_size = PJSIP_UDP_SO_SNDBUF_SIZE;
676 status = pj_sock_setsockopt(sock, pj_SOL_SOCKET(), pj_SO_SNDBUF(),
677 &sobuf_size, sizeof(sobuf_size));
678 if (status != PJ_SUCCESS) {
679 PJ_PERROR(4,(THIS_FILE, status, "Error setting SO_SNDBUF"));
680 }
681 #endif
682
683 /* Set the socket. */
684 tp->sock = sock;
685
686 /* Init address name (published address) */
687 udp_set_pub_name(tp, a_name);
688 }
689
690 /* Register socket to ioqueue */
register_to_ioqueue(struct udp_transport * tp)691 static pj_status_t register_to_ioqueue(struct udp_transport *tp)
692 {
693 pj_ioqueue_t *ioqueue;
694 pj_ioqueue_callback ioqueue_cb;
695 pj_status_t status;
696
697 /* Ignore if already registered */
698 if (tp->key != NULL)
699 return PJ_SUCCESS;
700
701 /* Create group lock if not yet (don't need to do so on UDP restart) */
702 if (!tp->grp_lock) {
703 status = pj_grp_lock_create(tp->base.pool, NULL, &tp->grp_lock);
704 if (status != PJ_SUCCESS)
705 return status;
706
707 pj_grp_lock_add_ref(tp->grp_lock);
708 pj_grp_lock_add_handler(tp->grp_lock, tp->base.pool, tp,
709 &udp_on_destroy);
710
711 tp->base.grp_lock = tp->grp_lock;
712 }
713
714 /* Register to ioqueue. */
715 ioqueue = pjsip_endpt_get_ioqueue(tp->base.endpt);
716 pj_memset(&ioqueue_cb, 0, sizeof(ioqueue_cb));
717 ioqueue_cb.on_read_complete = &udp_on_read_complete;
718 ioqueue_cb.on_write_complete = &udp_on_write_complete;
719
720 return pj_ioqueue_register_sock2(tp->base.pool, ioqueue, tp->sock,
721 tp->grp_lock, tp, &ioqueue_cb, &tp->key);
722 }
723
724 /* Start ioqueue asynchronous reading to all rdata */
start_async_read(struct udp_transport * tp)725 static pj_status_t start_async_read(struct udp_transport *tp)
726 {
727 int i;
728 pj_status_t status;
729
730 /* Start reading the ioqueue. */
731 for (i=0; i<tp->rdata_cnt; ++i) {
732 pj_ssize_t size;
733
734 size = sizeof(tp->rdata[i]->pkt_info.packet);
735 tp->rdata[i]->pkt_info.src_addr_len = sizeof(tp->rdata[i]->pkt_info.src_addr);
736 status = pj_ioqueue_recvfrom(tp->key,
737 &tp->rdata[i]->tp_info.op_key.op_key,
738 tp->rdata[i]->pkt_info.packet,
739 &size, PJ_IOQUEUE_ALWAYS_ASYNC,
740 &tp->rdata[i]->pkt_info.src_addr,
741 &tp->rdata[i]->pkt_info.src_addr_len);
742 if (status == PJ_SUCCESS) {
743 pj_assert(!"Shouldn't happen because PJ_IOQUEUE_ALWAYS_ASYNC!");
744 udp_on_read_complete(tp->key, &tp->rdata[i]->tp_info.op_key.op_key,
745 size);
746 } else if (status != PJ_EPENDING) {
747 /* Error! */
748 return status;
749 }
750 }
751
752 return PJ_SUCCESS;
753 }
754
755
756 /*
757 * pjsip_udp_transport_attach()
758 *
759 * Attach UDP socket and start transport.
760 */
transport_attach(pjsip_endpoint * endpt,pjsip_transport_type_e type,pj_sock_t sock,const pjsip_host_port * a_name,unsigned async_cnt,pjsip_transport ** p_transport)761 static pj_status_t transport_attach( pjsip_endpoint *endpt,
762 pjsip_transport_type_e type,
763 pj_sock_t sock,
764 const pjsip_host_port *a_name,
765 unsigned async_cnt,
766 pjsip_transport **p_transport)
767 {
768 pj_pool_t *pool;
769 struct udp_transport *tp;
770 const char *format, *ipv6_quoteb = "", *ipv6_quotee = "";
771 unsigned i;
772 pj_status_t status;
773
774 PJ_ASSERT_RETURN(endpt && sock!=PJ_INVALID_SOCKET && a_name && async_cnt>0,
775 PJ_EINVAL);
776
777 /* Object name. */
778 if (type & PJSIP_TRANSPORT_IPV6) {
779 pj_in6_addr dummy6;
780
781 format = "udpv6%p";
782 /* We don't need to add quote if the transport type is IPv6, but
783 * actually translated to IPv4.
784 */
785 if (pj_inet_pton(pj_AF_INET6(), &a_name->host, &dummy6)==PJ_SUCCESS) {
786 ipv6_quoteb = "[";
787 ipv6_quotee = "]";
788 }
789 } else {
790 format = "udp%p";
791 }
792
793 /* Create pool. */
794 pool = pjsip_endpt_create_pool(endpt, format, PJSIP_POOL_LEN_TRANSPORT,
795 PJSIP_POOL_INC_TRANSPORT);
796 if (!pool)
797 return PJ_ENOMEM;
798
799 /* Create the UDP transport object. */
800 tp = PJ_POOL_ZALLOC_T(pool, struct udp_transport);
801
802 /* Save pool. */
803 tp->base.pool = pool;
804
805 pj_memcpy(tp->base.obj_name, pool->obj_name, PJ_MAX_OBJ_NAME);
806
807 /* Init reference counter. */
808 status = pj_atomic_create(pool, 0, &tp->base.ref_cnt);
809 if (status != PJ_SUCCESS)
810 goto on_error;
811
812 /* Init lock. */
813 status = pj_lock_create_recursive_mutex(pool, pool->obj_name,
814 &tp->base.lock);
815 if (status != PJ_SUCCESS)
816 goto on_error;
817
818 /* Set type. */
819 tp->base.key.type = type;
820
821 /* Remote address is left zero (except the family) */
822 tp->base.key.rem_addr.addr.sa_family = (pj_uint16_t)
823 ((type & PJSIP_TRANSPORT_IPV6) ? pj_AF_INET6() : pj_AF_INET());
824
825 /* Type name. */
826 tp->base.type_name = "UDP";
827
828 /* Transport flag */
829 tp->base.flag = pjsip_transport_get_flag_from_type(type);
830
831
832 /* Length of addressess. */
833 tp->base.addr_len = sizeof(tp->base.local_addr);
834
835 /* Init local address. */
836 status = pj_sock_getsockname(sock, &tp->base.local_addr,
837 &tp->base.addr_len);
838 if (status != PJ_SUCCESS)
839 goto on_error;
840
841 /* Init remote name. */
842 if (type == PJSIP_TRANSPORT_UDP)
843 tp->base.remote_name.host = pj_str("0.0.0.0");
844 else
845 tp->base.remote_name.host = pj_str("::0");
846 tp->base.remote_name.port = 0;
847
848 /* Init direction */
849 tp->base.dir = PJSIP_TP_DIR_NONE;
850
851 /* Set endpoint. */
852 tp->base.endpt = endpt;
853
854 /* Transport manager and timer will be initialized by tpmgr */
855
856 /* Attach socket and assign name. */
857 udp_set_socket(tp, sock, a_name);
858
859 /* Register to ioqueue */
860 status = register_to_ioqueue(tp);
861 if (status != PJ_SUCCESS)
862 goto on_error;
863
864 /* Set functions. */
865 tp->base.send_msg = &udp_send_msg;
866 tp->base.do_shutdown = &udp_shutdown;
867 tp->base.destroy = &udp_destroy;
868
869 /* Register to transport manager. */
870 tp->base.tpmgr = pjsip_endpt_get_tpmgr(endpt);
871 status = pjsip_transport_register( tp->base.tpmgr, (pjsip_transport*)tp);
872 if (status != PJ_SUCCESS)
873 goto on_error;
874
875 /* This is a permanent transport, so we initialize the ref count
876 * to one so that transport manager won't destroy this transport
877 * when there's no user!
878 */
879 pjsip_transport_add_ref(&tp->base);
880
881 /* Create rdata and put it in the array. */
882 tp->rdata_cnt = 0;
883 tp->rdata = (pjsip_rx_data**)
884 pj_pool_calloc(tp->base.pool, async_cnt,
885 sizeof(pjsip_rx_data*));
886 for (i=0; i<async_cnt; ++i) {
887 pj_pool_t *rdata_pool = pjsip_endpt_create_pool(endpt, "rtd%p",
888 PJSIP_POOL_RDATA_LEN,
889 PJSIP_POOL_RDATA_INC);
890 if (!rdata_pool) {
891 pj_atomic_set(tp->base.ref_cnt, 0);
892 pjsip_transport_destroy(&tp->base);
893 return PJ_ENOMEM;
894 }
895
896 init_rdata(tp, i, rdata_pool, NULL);
897 tp->rdata_cnt++;
898 }
899
900 /* Start reading the ioqueue. */
901 status = start_async_read(tp);
902 if (status != PJ_SUCCESS) {
903 pjsip_transport_destroy(&tp->base);
904 return status;
905 }
906
907 /* Done. */
908 if (p_transport)
909 *p_transport = &tp->base;
910
911 PJ_LOG(4,(tp->base.obj_name,
912 "SIP %s started, published address is %s%.*s%s:%d",
913 pjsip_transport_get_type_desc((pjsip_transport_type_e)tp->base.key.type),
914 ipv6_quoteb,
915 (int)tp->base.local_name.host.slen,
916 tp->base.local_name.host.ptr,
917 ipv6_quotee,
918 tp->base.local_name.port));
919
920 return PJ_SUCCESS;
921
922 on_error:
923 udp_destroy((pjsip_transport*)tp);
924 return status;
925 }
926
927
pjsip_udp_transport_attach(pjsip_endpoint * endpt,pj_sock_t sock,const pjsip_host_port * a_name,unsigned async_cnt,pjsip_transport ** p_transport)928 PJ_DEF(pj_status_t) pjsip_udp_transport_attach( pjsip_endpoint *endpt,
929 pj_sock_t sock,
930 const pjsip_host_port *a_name,
931 unsigned async_cnt,
932 pjsip_transport **p_transport)
933 {
934 return transport_attach(endpt, PJSIP_TRANSPORT_UDP, sock, a_name,
935 async_cnt, p_transport);
936 }
937
pjsip_udp_transport_attach2(pjsip_endpoint * endpt,pjsip_transport_type_e type,pj_sock_t sock,const pjsip_host_port * a_name,unsigned async_cnt,pjsip_transport ** p_transport)938 PJ_DEF(pj_status_t) pjsip_udp_transport_attach2( pjsip_endpoint *endpt,
939 pjsip_transport_type_e type,
940 pj_sock_t sock,
941 const pjsip_host_port *a_name,
942 unsigned async_cnt,
943 pjsip_transport **p_transport)
944 {
945 return transport_attach(endpt, type, sock, a_name,
946 async_cnt, p_transport);
947 }
948
949
950 /*
951 * Initialize pjsip_udp_transport_cfg structure with default values.
952 */
pjsip_udp_transport_cfg_default(pjsip_udp_transport_cfg * cfg,int af)953 PJ_DEF(void) pjsip_udp_transport_cfg_default(pjsip_udp_transport_cfg *cfg,
954 int af)
955 {
956 pj_bzero(cfg, sizeof(*cfg));
957 cfg->af = af;
958 pj_sockaddr_init(cfg->af, &cfg->bind_addr, NULL, 0);
959 cfg->async_cnt = 1;
960 }
961
962
963 /*
964 * pjsip_udp_transport_start2()
965 *
966 * Create a UDP socket in the specified address and start a transport.
967 */
pjsip_udp_transport_start2(pjsip_endpoint * endpt,const pjsip_udp_transport_cfg * cfg,pjsip_transport ** p_transport)968 PJ_DEF(pj_status_t) pjsip_udp_transport_start2(
969 pjsip_endpoint *endpt,
970 const pjsip_udp_transport_cfg *cfg,
971 pjsip_transport **p_transport)
972 {
973 pj_sock_t sock;
974 pj_status_t status;
975 pjsip_host_port addr_name;
976 char addr_buf[PJ_INET6_ADDRSTRLEN];
977 pjsip_transport_type_e transport_type;
978 pj_uint16_t af;
979 int addr_len;
980
981 PJ_ASSERT_RETURN(endpt && cfg && cfg->async_cnt, PJ_EINVAL);
982
983 if (cfg->bind_addr.addr.sa_family == pj_AF_INET()) {
984 af = pj_AF_INET();
985 transport_type = PJSIP_TRANSPORT_UDP;
986 addr_len = sizeof(pj_sockaddr_in);
987 } else {
988 af = pj_AF_INET6();
989 transport_type = PJSIP_TRANSPORT_UDP6;
990 addr_len = sizeof(pj_sockaddr_in6);
991 }
992
993 status = create_socket(af, &cfg->bind_addr, addr_len, &sock);
994 if (status != PJ_SUCCESS)
995 return status;
996
997 /* Apply QoS, if specified */
998 pj_sock_apply_qos2(sock, cfg->qos_type, &cfg->qos_params,
999 2, THIS_FILE, "SIP UDP transport");
1000
1001 /* Apply sockopt, if specified */
1002 if (cfg->sockopt_params.cnt)
1003 pj_sock_setsockopt_params(sock, &cfg->sockopt_params);
1004
1005 if (cfg->addr_name.host.slen == 0) {
1006 /* Address name is not specified.
1007 * Build a name based on bound address.
1008 */
1009 status = get_published_name(sock, addr_buf, sizeof(addr_buf),
1010 &addr_name);
1011 if (status != PJ_SUCCESS) {
1012 pj_sock_close(sock);
1013 return status;
1014 }
1015 } else {
1016 addr_name = cfg->addr_name;
1017 }
1018
1019 return pjsip_udp_transport_attach2(endpt, transport_type, sock,
1020 &addr_name, cfg->async_cnt,
1021 p_transport);
1022 }
1023
1024 /*
1025 * pjsip_udp_transport_start()
1026 *
1027 * Create a UDP socket in the specified address and start a transport.
1028 */
pjsip_udp_transport_start(pjsip_endpoint * endpt,const pj_sockaddr_in * local_a,const pjsip_host_port * a_name,unsigned async_cnt,pjsip_transport ** p_transport)1029 PJ_DEF(pj_status_t) pjsip_udp_transport_start( pjsip_endpoint *endpt,
1030 const pj_sockaddr_in *local_a,
1031 const pjsip_host_port *a_name,
1032 unsigned async_cnt,
1033 pjsip_transport **p_transport)
1034 {
1035 pjsip_udp_transport_cfg cfg;
1036
1037 pjsip_udp_transport_cfg_default(&cfg, pj_AF_INET());
1038 if (local_a)
1039 pj_sockaddr_cp(&cfg.bind_addr, local_a);
1040 if (a_name)
1041 cfg.addr_name = *a_name;
1042 cfg.async_cnt = async_cnt;
1043
1044 return pjsip_udp_transport_start2(endpt, &cfg, p_transport);
1045 }
1046
1047 /*
1048 * pjsip_udp_transport_start()
1049 *
1050 * Create a UDP socket in the specified address and start a transport.
1051 */
pjsip_udp_transport_start6(pjsip_endpoint * endpt,const pj_sockaddr_in6 * local_a,const pjsip_host_port * a_name,unsigned async_cnt,pjsip_transport ** p_transport)1052 PJ_DEF(pj_status_t) pjsip_udp_transport_start6(pjsip_endpoint *endpt,
1053 const pj_sockaddr_in6 *local_a,
1054 const pjsip_host_port *a_name,
1055 unsigned async_cnt,
1056 pjsip_transport **p_transport)
1057 {
1058 pjsip_udp_transport_cfg cfg;
1059
1060 pjsip_udp_transport_cfg_default(&cfg, pj_AF_INET6());
1061 if (local_a)
1062 pj_sockaddr_cp(&cfg.bind_addr, local_a);
1063 if (a_name)
1064 cfg.addr_name = *a_name;
1065 cfg.async_cnt = async_cnt;
1066
1067 return pjsip_udp_transport_start2(endpt, &cfg, p_transport);
1068 }
1069
1070 /*
1071 * Retrieve the internal socket handle used by the UDP transport.
1072 */
pjsip_udp_transport_get_socket(pjsip_transport * transport)1073 PJ_DEF(pj_sock_t) pjsip_udp_transport_get_socket(pjsip_transport *transport)
1074 {
1075 struct udp_transport *tp;
1076
1077 PJ_ASSERT_RETURN(transport != NULL, PJ_INVALID_SOCKET);
1078
1079 tp = (struct udp_transport*) transport;
1080
1081 return tp->sock;
1082 }
1083
1084
1085 /*
1086 * Temporarily pause or shutdown the transport.
1087 */
pjsip_udp_transport_pause(pjsip_transport * transport,unsigned option)1088 PJ_DEF(pj_status_t) pjsip_udp_transport_pause(pjsip_transport *transport,
1089 unsigned option)
1090 {
1091 struct udp_transport *tp;
1092 unsigned i;
1093
1094 PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
1095
1096 /* Flag must be specified */
1097 PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
1098
1099 tp = (struct udp_transport*) transport;
1100
1101 /* Transport must not have been paused */
1102 PJ_ASSERT_RETURN(tp->is_paused==0, PJ_EINVALIDOP);
1103
1104 /* Set transport to paused first, so that when the read callback is
1105 * called by pj_ioqueue_post_completion() it will not try to
1106 * re-register the rdata.
1107 */
1108 tp->is_paused = PJ_TRUE;
1109
1110 /* Cancel the ioqueue operation. */
1111 for (i=0; i<(unsigned)tp->rdata_cnt; ++i) {
1112 pj_ioqueue_post_completion(tp->key,
1113 &tp->rdata[i]->tp_info.op_key.op_key, -1);
1114 }
1115
1116 /* Destroy the socket? */
1117 if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
1118 if (tp->key) {
1119 /* This implicitly closes the socket */
1120 pj_ioqueue_unregister(tp->key);
1121 tp->key = NULL;
1122 } else {
1123 /* Close socket. */
1124 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
1125 pj_sock_close(tp->sock);
1126 tp->sock = PJ_INVALID_SOCKET;
1127 }
1128 }
1129 tp->sock = PJ_INVALID_SOCKET;
1130
1131 }
1132
1133 PJ_LOG(4,(tp->base.obj_name, "SIP UDP transport paused"));
1134
1135 return PJ_SUCCESS;
1136 }
1137
1138
1139 /*
1140 * Restart transport.
1141 *
1142 * If option is KEEP_SOCKET, just re-activate ioqueue operation.
1143 *
1144 * If option is DESTROY_SOCKET:
1145 * - if socket is specified, replace.
1146 * - if socket is not specified, create and replace.
1147 */
pjsip_udp_transport_restart(pjsip_transport * transport,unsigned option,pj_sock_t sock,const pj_sockaddr_in * local,const pjsip_host_port * a_name)1148 PJ_DEF(pj_status_t) pjsip_udp_transport_restart(pjsip_transport *transport,
1149 unsigned option,
1150 pj_sock_t sock,
1151 const pj_sockaddr_in *local,
1152 const pjsip_host_port *a_name)
1153 {
1154 return pjsip_udp_transport_restart2(transport, option, sock,
1155 (pj_sockaddr*)local, a_name);
1156 }
1157
1158
pjsip_udp_transport_restart2(pjsip_transport * transport,unsigned option,pj_sock_t sock,const pj_sockaddr * local,const pjsip_host_port * a_name)1159 PJ_DEF(pj_status_t) pjsip_udp_transport_restart2(pjsip_transport *transport,
1160 unsigned option,
1161 pj_sock_t sock,
1162 const pj_sockaddr *local,
1163 const pjsip_host_port *a_name)
1164 {
1165 struct udp_transport *tp;
1166 pj_status_t status;
1167 char addr[PJ_INET6_ADDRSTRLEN+10];
1168 int i;
1169
1170 PJ_ASSERT_RETURN(transport != NULL, PJ_EINVAL);
1171 /* Flag must be specified */
1172 PJ_ASSERT_RETURN((option & 0x03) != 0, PJ_EINVAL);
1173
1174 tp = (struct udp_transport*) transport;
1175
1176 /* Pause the transport first, so that any active read loop spin will
1177 * quit as soon as possible.
1178 */
1179 tp->is_paused = PJ_TRUE;
1180
1181 if (option & PJSIP_UDP_TRANSPORT_DESTROY_SOCKET) {
1182 char addr_buf[PJ_INET6_ADDRSTRLEN];
1183 pjsip_host_port bound_name;
1184
1185 /* Request to recreate transport */
1186
1187 /* Destroy existing socket, if any. */
1188 if (tp->key) {
1189 /* This implicitly closes the socket */
1190 pj_ioqueue_unregister(tp->key);
1191 tp->key = NULL;
1192 } else {
1193 /* Close socket. */
1194 if (tp->sock && tp->sock != PJ_INVALID_SOCKET) {
1195 pj_sock_close(tp->sock);
1196 tp->sock = PJ_INVALID_SOCKET;
1197 }
1198 }
1199 tp->sock = PJ_INVALID_SOCKET;
1200
1201 /* Create the socket if it's not specified */
1202 if (sock == PJ_INVALID_SOCKET) {
1203 status = create_socket(local?local->addr.sa_family:pj_AF_UNSPEC(),
1204 local, local?pj_sockaddr_get_len(local):0,
1205 &sock);
1206 if (status != PJ_SUCCESS)
1207 return status;
1208 }
1209
1210 /* If transport published name is not specified, calculate it
1211 * from the bound address.
1212 */
1213 if (a_name == NULL) {
1214 status = get_published_name(sock, addr_buf, sizeof(addr_buf),
1215 &bound_name);
1216 if (status != PJ_SUCCESS) {
1217 pj_sock_close(sock);
1218 return status;
1219 }
1220
1221 a_name = &bound_name;
1222 }
1223
1224 /* Init local address. */
1225 status = pj_sock_getsockname(sock, &tp->base.local_addr,
1226 &tp->base.addr_len);
1227 if (status != PJ_SUCCESS) {
1228 pj_sock_close(sock);
1229 return status;
1230 }
1231
1232 /* Assign the socket and published address to transport. */
1233 udp_set_socket(tp, sock, a_name);
1234
1235 } else {
1236
1237 /* For KEEP_SOCKET, transport must have been paused before */
1238 PJ_ASSERT_RETURN(tp->is_paused, PJ_EINVALIDOP);
1239
1240 /* If address name is specified, update it */
1241 if (a_name != NULL)
1242 udp_set_pub_name(tp, a_name);
1243 }
1244
1245 /* Make sure all udp_on_read_complete() loop spin are stopped */
1246 do {
1247 pj_thread_sleep(1);
1248 } while (tp->read_loop_spin);
1249
1250 /* Re-register new or existing socket to ioqueue. */
1251 status = register_to_ioqueue(tp);
1252 if (status != PJ_SUCCESS) {
1253 return status;
1254 }
1255
1256 /* Re-init op_key. */
1257 for (i = 0; i < tp->rdata_cnt; ++i) {
1258 pj_ioqueue_op_key_init(&tp->rdata[i]->tp_info.op_key.op_key,
1259 sizeof(pj_ioqueue_op_key_t));
1260 }
1261
1262 /* Restart async read operation. */
1263 status = start_async_read(tp);
1264 if (status != PJ_SUCCESS)
1265 return status;
1266
1267 /* Everything has been set up */
1268 tp->is_paused = PJ_FALSE;
1269
1270 PJ_LOG(4, (tp->base.obj_name,
1271 "SIP UDP transport restarted, published address is %s",
1272 pj_addr_str_print(&tp->base.local_name.host,
1273 tp->base.local_name.port,
1274 addr, sizeof(addr), 1)));
1275
1276 return PJ_SUCCESS;
1277 }
1278