1 /* $Id$ */
2 /*
3  * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20 #include <pjlib.h>
21 #include "test.h"
22 
23 static pj_ioqueue_key_t *key;
24 static pj_atomic_t *total_bytes;
25 static pj_bool_t thread_quit_flag;
26 
27 struct op_key
28 {
29     pj_ioqueue_op_key_t  op_key_;
30     struct op_key       *peer;
31     char                *buffer;
32     pj_size_t            size;
33     int                  is_pending;
34     pj_status_t          last_err;
35     pj_sockaddr_in       addr;
36     int                  addrlen;
37 };
38 
on_read_complete(pj_ioqueue_key_t * ioq_key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_received)39 static void on_read_complete(pj_ioqueue_key_t *ioq_key,
40                              pj_ioqueue_op_key_t *op_key,
41                              pj_ssize_t bytes_received)
42 {
43     pj_status_t rc;
44     struct op_key *recv_rec = (struct op_key *)op_key;
45 
46     for (;;) {
47         struct op_key *send_rec = recv_rec->peer;
48         recv_rec->is_pending = 0;
49 
50         if (bytes_received < 0) {
51             if (-bytes_received != recv_rec->last_err) {
52                 recv_rec->last_err = (pj_status_t)-bytes_received;
53                 app_perror("...error receiving data", recv_rec->last_err);
54             }
55         } else if (bytes_received == 0) {
56             /* note: previous error, or write callback */
57         } else {
58             pj_atomic_add(total_bytes, (pj_atomic_value_t)bytes_received);
59 
60             if (!send_rec->is_pending) {
61                 pj_ssize_t sent = bytes_received;
62                 pj_memcpy(send_rec->buffer, recv_rec->buffer, bytes_received);
63                 pj_memcpy(&send_rec->addr, &recv_rec->addr, recv_rec->addrlen);
64                 send_rec->addrlen = recv_rec->addrlen;
65                 rc = pj_ioqueue_sendto(ioq_key, &send_rec->op_key_,
66                                        send_rec->buffer, &sent, 0,
67                                        &send_rec->addr, send_rec->addrlen);
68                 send_rec->is_pending = (rc==PJ_EPENDING);
69 
70                 if (rc!=PJ_SUCCESS && rc!=PJ_EPENDING) {
71                     app_perror("...send error(1)", rc);
72                 }
73             }
74         }
75 
76         if (!send_rec->is_pending) {
77             bytes_received = recv_rec->size;
78             rc = pj_ioqueue_recvfrom(ioq_key, &recv_rec->op_key_,
79                                      recv_rec->buffer, &bytes_received, 0,
80                                      &recv_rec->addr, &recv_rec->addrlen);
81             recv_rec->is_pending = (rc==PJ_EPENDING);
82             if (rc == PJ_SUCCESS) {
83                 /* fall through next loop. */
84             } else if (rc == PJ_EPENDING) {
85                 /* quit callback. */
86                 break;
87             } else {
88                 /* error */
89                 app_perror("...recv error", rc);
90                 recv_rec->last_err = rc;
91 
92                 bytes_received = 0;
93                 /* fall through next loop. */
94             }
95         } else {
96             /* recv will be done when write completion callback is called. */
97             break;
98         }
99     }
100 }
101 
on_write_complete(pj_ioqueue_key_t * ioq_key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_sent)102 static void on_write_complete(pj_ioqueue_key_t *ioq_key,
103                               pj_ioqueue_op_key_t *op_key,
104                               pj_ssize_t bytes_sent)
105 {
106     struct op_key *send_rec = (struct op_key*)op_key;
107 
108     if (bytes_sent <= 0) {
109         pj_status_t rc = (pj_status_t)-bytes_sent;
110         if (rc != send_rec->last_err) {
111             send_rec->last_err = rc;
112             app_perror("...send error(2)", rc);
113         }
114     }
115 
116     send_rec->is_pending = 0;
117     on_read_complete(ioq_key, &send_rec->peer->op_key_, 0);
118 }
119 
worker_thread(void * arg)120 static int worker_thread(void *arg)
121 {
122     pj_ioqueue_t *ioqueue = (pj_ioqueue_t*) arg;
123     struct op_key read_op, write_op;
124     char recv_buf[512], send_buf[512];
125     pj_ssize_t length;
126     pj_status_t rc;
127 
128     read_op.peer = &write_op;
129     read_op.is_pending = 0;
130     read_op.last_err = 0;
131     read_op.buffer = recv_buf;
132     read_op.size = sizeof(recv_buf);
133     read_op.addrlen = sizeof(read_op.addr);
134 
135     write_op.peer = &read_op;
136     write_op.is_pending = 0;
137     write_op.last_err = 0;
138     write_op.buffer = send_buf;
139     write_op.size = sizeof(send_buf);
140 
141     length = sizeof(recv_buf);
142     rc = pj_ioqueue_recvfrom(key, &read_op.op_key_, recv_buf, &length, 0,
143                              &read_op.addr, &read_op.addrlen);
144     if (rc == PJ_SUCCESS) {
145         read_op.is_pending = 1;
146         on_read_complete(key, &read_op.op_key_, length);
147     }
148 
149     while (!thread_quit_flag) {
150         pj_time_val timeout;
151         timeout.sec = 0; timeout.msec = 10;
152         rc = pj_ioqueue_poll(ioqueue, &timeout);
153     }
154     return 0;
155 }
156 
udp_echo_srv_ioqueue(void)157 int udp_echo_srv_ioqueue(void)
158 {
159     pj_pool_t *pool;
160     pj_sock_t sock;
161     pj_ioqueue_t *ioqueue;
162     pj_ioqueue_callback callback;
163     int i;
164     pj_thread_t *thread[ECHO_SERVER_MAX_THREADS];
165     pj_status_t rc;
166 
167     pj_bzero(&callback, sizeof(callback));
168     callback.on_read_complete = &on_read_complete;
169     callback.on_write_complete = &on_write_complete;
170 
171     pool = pj_pool_create(mem, NULL, 4000, 4000, NULL);
172     if (!pool)
173         return -10;
174 
175     rc = pj_ioqueue_create(pool, 2, &ioqueue);
176     if (rc != PJ_SUCCESS) {
177         app_perror("...pj_ioqueue_create error", rc);
178         return -20;
179     }
180 
181     rc = app_socket(pj_AF_INET(), pj_SOCK_DGRAM(), 0,
182                     ECHO_SERVER_START_PORT, &sock);
183     if (rc != PJ_SUCCESS) {
184         app_perror("...app_socket error", rc);
185         return -30;
186     }
187 
188     rc = pj_ioqueue_register_sock(pool, ioqueue, sock, NULL,
189                                   &callback, &key);
190     if (rc != PJ_SUCCESS) {
191         app_perror("...error registering socket", rc);
192         return -40;
193     }
194 
195     rc = pj_atomic_create(pool, 0, &total_bytes);
196     if (rc != PJ_SUCCESS) {
197         app_perror("...error creating atomic variable", rc);
198         return -45;
199     }
200 
201     for (i=0; i<ECHO_SERVER_MAX_THREADS; ++i) {
202         rc = pj_thread_create(pool, NULL, &worker_thread, ioqueue,
203                               PJ_THREAD_DEFAULT_STACK_SIZE, 0,
204                               &thread[i]);
205         if (rc != PJ_SUCCESS) {
206             app_perror("...create thread error", rc);
207             return -50;
208         }
209     }
210 
211     echo_srv_common_loop(total_bytes);
212 
213     return 0;
214 }
215