1 /* $Id$ */
2 /*
3  * Copyright (C) 2008-2009 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  */
20 #ifndef __PJPP_PROACTOR_HPP__
21 #define __PJPP_PROACTOR_HPP__
22 
23 #include <pj/ioqueue.h>
24 #include <pj++/pool.hpp>
25 #include <pj++/sock.hpp>
26 #include <pj++/timer.hpp>
27 #include <pj/errno.h>
28 
29 class Pj_Proactor;
30 class Pj_Event_Handler;
31 
32 
33 //////////////////////////////////////////////////////////////////////////////
34 // Asynchronous operation key.
35 //
36 // Applications may inheric this class to put their application
37 // specific data.
38 //
39 class Pj_Async_Op : public pj_ioqueue_op_key_t
40 {
41 public:
42     //
43     // Construct with null handler.
44     // App must call set_handler() before use.
45     //
Pj_Async_Op()46     Pj_Async_Op()
47         : handler_(NULL)
48     {
49 	pj_ioqueue_op_key_init(this, sizeof(*this));
50     }
51 
52     //
53     // Constructor.
54     //
Pj_Async_Op(Pj_Event_Handler * handler)55     explicit Pj_Async_Op(Pj_Event_Handler *handler)
56         : handler_(handler)
57     {
58 	pj_ioqueue_op_key_init(this, sizeof(*this));
59     }
60 
61     //
62     // Set handler.
63     //
set_handler(Pj_Event_Handler * handler)64     void set_handler(Pj_Event_Handler *handler)
65     {
66         handler_ = handler;
67     }
68 
69     //
70     // Check whether operation is still pending for this key.
71     //
72     bool is_pending();
73 
74     //
75     // Cancel the operation.
76     //
77     bool cancel(pj_ssize_t bytes_status=-PJ_ECANCELLED);
78 
79 protected:
80     Pj_Event_Handler *handler_;
81 };
82 
83 
84 //////////////////////////////////////////////////////////////////////////////
85 // Event handler.
86 //
87 // Applications should inherit this class to receive various event
88 // notifications.
89 //
90 // Applications should implement get_socket_handle().
91 //
92 class Pj_Event_Handler : public Pj_Object
93 {
94     friend class Pj_Proactor;
95 public:
96     //
97     // Default constructor.
98     //
Pj_Event_Handler()99     Pj_Event_Handler()
100         : key_(NULL)
101     {
102         pj_memset(&timer_, 0, sizeof(timer_));
103         timer_.user_data = this;
104         timer_.cb = &timer_callback;
105     }
106 
107     //
108     // Destroy.
109     //
~Pj_Event_Handler()110     virtual ~Pj_Event_Handler()
111     {
112         unregister();
113     }
114 
115     //
116     // Unregister this handler from the ioqueue.
117     //
unregister()118     void unregister()
119     {
120         if (key_) {
121             pj_ioqueue_unregister(key_);
122             key_ = NULL;
123         }
124     }
125 
126     //
127     // Get socket handle associated with this.
128     //
get_socket_handle()129     virtual pj_sock_t get_socket_handle()
130     {
131         return PJ_INVALID_SOCKET;
132     }
133 
134     //
135     // Start async receive.
136     //
recv(Pj_Async_Op * op_key,void * buf,pj_ssize_t * len,unsigned flags)137     pj_status_t recv( Pj_Async_Op *op_key,
138                       void *buf, pj_ssize_t *len,
139                       unsigned flags)
140     {
141         return pj_ioqueue_recv( key_, op_key,
142                                 buf, len, flags);
143     }
144 
145     //
146     // Start async recvfrom()
147     //
recvfrom(Pj_Async_Op * op_key,void * buf,pj_ssize_t * len,unsigned flags,Pj_Inet_Addr * addr)148     pj_status_t recvfrom( Pj_Async_Op *op_key,
149                           void *buf, pj_ssize_t *len, unsigned flags,
150                           Pj_Inet_Addr *addr)
151     {
152         addr->addrlen_ = sizeof(Pj_Inet_Addr);
153         return pj_ioqueue_recvfrom( key_, op_key, buf, len, flags,
154                                     addr, &addr->addrlen_ );
155     }
156 
157     //
158     // Start async send()
159     //
send(Pj_Async_Op * op_key,const void * data,pj_ssize_t * len,unsigned flags)160     pj_status_t send( Pj_Async_Op *op_key,
161                       const void *data, pj_ssize_t *len,
162                       unsigned flags)
163     {
164         return pj_ioqueue_send( key_, op_key, data, len, flags);
165     }
166 
167     //
168     // Start async sendto()
169     //
sendto(Pj_Async_Op * op_key,const void * data,pj_ssize_t * len,unsigned flags,const Pj_Inet_Addr & addr)170     pj_status_t sendto( Pj_Async_Op *op_key,
171                         const void *data, pj_ssize_t *len, unsigned flags,
172                         const Pj_Inet_Addr &addr)
173     {
174         return pj_ioqueue_sendto(key_, op_key, data, len, flags,
175                                  &addr, sizeof(addr));
176     }
177 
178 #if PJ_HAS_TCP
179     //
180     // Start async connect()
181     //
connect(const Pj_Inet_Addr & addr)182     pj_status_t connect(const Pj_Inet_Addr &addr)
183     {
184         return pj_ioqueue_connect(key_, &addr, sizeof(addr));
185     }
186 
187     //
188     // Start async accept().
189     //
accept(Pj_Async_Op * op_key,Pj_Socket * sock,Pj_Inet_Addr * local=NULL,Pj_Inet_Addr * remote=NULL)190     pj_status_t accept( Pj_Async_Op *op_key,
191                         Pj_Socket *sock,
192                         Pj_Inet_Addr *local = NULL,
193                         Pj_Inet_Addr *remote = NULL)
194     {
195         int *addrlen = local ? &local->addrlen_ : NULL;
196         return pj_ioqueue_accept( key_, op_key, &sock->sock_,
197                                   local, remote, addrlen );
198     }
199 
200 #endif
201 
202 protected:
203     //////////////////
204     // Overridables
205     //////////////////
206 
207     //
208     // Timeout callback.
209     //
on_timeout(int)210     virtual void on_timeout(int)
211     {
212     }
213 
214     //
215     // On read complete callback.
216     //
on_read_complete(Pj_Async_Op *,pj_ssize_t)217     virtual void on_read_complete( Pj_Async_Op*, pj_ssize_t)
218     {
219     }
220 
221     //
222     // On write complete callback.
223     //
on_write_complete(Pj_Async_Op *,pj_ssize_t)224     virtual void on_write_complete( Pj_Async_Op *, pj_ssize_t)
225     {
226     }
227 
228 #if PJ_HAS_TCP
229     //
230     // On connect complete callback.
231     //
on_connect_complete(pj_status_t)232     virtual void on_connect_complete(pj_status_t)
233     {
234     }
235 
236     //
237     // On new connection callback.
238     //
on_accept_complete(Pj_Async_Op *,pj_sock_t,pj_status_t)239     virtual void on_accept_complete( Pj_Async_Op*, pj_sock_t, pj_status_t)
240     {
241     }
242 
243 #endif
244 
245 
246 private:
247     pj_ioqueue_key_t *key_;
248     pj_timer_entry    timer_;
249 
250     friend class Pj_Proactor;
251     friend class Pj_Async_Op;
252 
253     //
254     // Static timer callback.
255     //
timer_callback(pj_timer_heap_t *,struct pj_timer_entry * entry)256     static void timer_callback( pj_timer_heap_t*,
257                                 struct pj_timer_entry *entry)
258     {
259         Pj_Event_Handler *handler =
260             (Pj_Event_Handler*) entry->user_data;
261 
262         handler->on_timeout(entry->id);
263     }
264 };
265 
is_pending()266 inline bool Pj_Async_Op::is_pending()
267 {
268     return pj_ioqueue_is_pending(handler_->key_, this) != 0;
269 }
270 
cancel(pj_ssize_t bytes_status)271 inline bool Pj_Async_Op::cancel(pj_ssize_t bytes_status)
272 {
273     return pj_ioqueue_post_completion(handler_->key_, this,
274                                       bytes_status) == PJ_SUCCESS;
275 }
276 
277 //////////////////////////////////////////////////////////////////////////////
278 // Proactor
279 //
280 class Pj_Proactor : public Pj_Object
281 {
282 public:
283     //
284     // Default constructor, initializes to NULL.
285     //
Pj_Proactor()286     Pj_Proactor()
287         : ioq_(NULL), th_(NULL)
288     {
289         cb_.on_read_complete    = &read_complete_cb;
290         cb_.on_write_complete   = &write_complete_cb;
291         cb_.on_accept_complete  = &accept_complete_cb;
292         cb_.on_connect_complete = &connect_complete_cb;
293     }
294 
295     //
296     // Construct proactor.
297     //
Pj_Proactor(Pj_Pool * pool,pj_size_t max_fd,pj_size_t max_timer_entries)298     Pj_Proactor( Pj_Pool *pool, pj_size_t max_fd,
299                  pj_size_t max_timer_entries )
300     : ioq_(NULL), th_(NULL)
301     {
302         cb_.on_read_complete    = &read_complete_cb;
303         cb_.on_write_complete   = &write_complete_cb;
304         cb_.on_accept_complete  = &accept_complete_cb;
305         cb_.on_connect_complete = &connect_complete_cb;
306 
307         create(pool, max_fd, max_timer_entries);
308     }
309 
310     //
311     // Destructor.
312     //
~Pj_Proactor()313     ~Pj_Proactor()
314     {
315         destroy();
316     }
317 
318     //
319     // Create proactor.
320     //
create(Pj_Pool * pool,pj_size_t max_fd,pj_size_t timer_entry_count)321     pj_status_t create( Pj_Pool *pool, pj_size_t max_fd,
322 			pj_size_t timer_entry_count)
323     {
324         pj_status_t status;
325 
326         destroy();
327 
328         status = pj_ioqueue_create(pool->pool_(), max_fd, &ioq_);
329         if (status != PJ_SUCCESS)
330             return status;
331 
332         status = pj_timer_heap_create(pool->pool_(),
333                                       timer_entry_count, &th_);
334         if (status != PJ_SUCCESS) {
335             pj_ioqueue_destroy(ioq_);
336             ioq_ = NULL;
337             return NULL;
338         }
339 
340         return status;
341     }
342 
343     //
344     // Destroy proactor.
345     //
destroy()346     void destroy()
347     {
348         if (ioq_) {
349             pj_ioqueue_destroy(ioq_);
350             ioq_ = NULL;
351         }
352         if (th_) {
353             pj_timer_heap_destroy(th_);
354             th_ = NULL;
355         }
356     }
357 
358     //
359     // Register handler.
360     // This will call handler->get_socket_handle()
361     //
register_socket_handler(Pj_Pool * pool,Pj_Event_Handler * handler)362     pj_status_t register_socket_handler(Pj_Pool *pool,
363                                         Pj_Event_Handler *handler)
364     {
365         return   pj_ioqueue_register_sock( pool->pool_(), ioq_,
366                                            handler->get_socket_handle(),
367                                            handler, &cb_, &handler->key_ );
368     }
369 
370     //
371     // Unregister handler.
372     //
unregister_handler(Pj_Event_Handler * handler)373     static void unregister_handler(Pj_Event_Handler *handler)
374     {
375         if (handler->key_) {
376             pj_ioqueue_unregister( handler->key_ );
377             handler->key_ = NULL;
378         }
379     }
380 
381     //
382     // Scheduler timer.
383     //
schedule_timer(Pj_Event_Handler * handler,const Pj_Time_Val & delay,int id=-1)384     bool schedule_timer( Pj_Event_Handler *handler,
385                          const Pj_Time_Val &delay,
386                          int id=-1)
387     {
388         return schedule_timer(th_, handler, delay, id);
389     }
390 
391     //
392     // Cancel timer.
393     //
cancel_timer(Pj_Event_Handler * handler)394     bool cancel_timer(Pj_Event_Handler *handler)
395     {
396         return pj_timer_heap_cancel(th_, &handler->timer_) == 1;
397     }
398 
399     //
400     // Handle events.
401     //
handle_events(Pj_Time_Val * max_timeout)402     int handle_events(Pj_Time_Val *max_timeout)
403     {
404         Pj_Time_Val timeout(0, 0);
405         int timer_count;
406 
407         timer_count = pj_timer_heap_poll( th_, &timeout );
408 
409         if (timeout.get_sec() < 0)
410             timeout.sec = PJ_MAXINT32;
411 
412         /* If caller specifies maximum time to wait, then compare the value
413          * with the timeout to wait from timer, and use the minimum value.
414          */
415         if (max_timeout && timeout >= *max_timeout) {
416 	    timeout = *max_timeout;
417         }
418 
419         /* Poll events in ioqueue. */
420         int ioqueue_count;
421 
422         ioqueue_count = pj_ioqueue_poll(ioq_, &timeout);
423         if (ioqueue_count < 0)
424 	    return ioqueue_count;
425 
426         return ioqueue_count + timer_count;
427     }
428 
429     //
430     // Get the internal ioqueue object.
431     //
get_io_queue()432     pj_ioqueue_t *get_io_queue()
433     {
434         return ioq_;
435     }
436 
437     //
438     // Get the internal timer heap object.
439     //
get_timer_heap()440     pj_timer_heap_t *get_timer_heap()
441     {
442         return th_;
443     }
444 
445 private:
446     pj_ioqueue_t *ioq_;
447     pj_timer_heap_t *th_;
448     pj_ioqueue_callback cb_;
449 
schedule_timer(pj_timer_heap_t * timer,Pj_Event_Handler * handler,const Pj_Time_Val & delay,int id=-1)450     static bool schedule_timer( pj_timer_heap_t *timer,
451                                 Pj_Event_Handler *handler,
452 				const Pj_Time_Val &delay,
453                                 int id=-1)
454     {
455         handler->timer_.id = id;
456         return pj_timer_heap_schedule(timer, &handler->timer_, &delay) == 0;
457     }
458 
459 
460     //
461     // Static read completion callback.
462     //
read_complete_cb(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_read)463     static void read_complete_cb( pj_ioqueue_key_t *key,
464                                   pj_ioqueue_op_key_t *op_key,
465                                   pj_ssize_t bytes_read)
466     {
467         Pj_Event_Handler *handler =
468 	    (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
469 
470         handler->on_read_complete((Pj_Async_Op*)op_key, bytes_read);
471     }
472 
473     //
474     // Static write completion callback.
475     //
write_complete_cb(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_ssize_t bytes_sent)476     static void write_complete_cb(pj_ioqueue_key_t *key,
477                                   pj_ioqueue_op_key_t *op_key,
478                                   pj_ssize_t bytes_sent)
479     {
480         Pj_Event_Handler *handler =
481 	    (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
482 
483         handler->on_write_complete((Pj_Async_Op*)op_key, bytes_sent);
484     }
485 
486     //
487     // Static accept completion callback.
488     //
accept_complete_cb(pj_ioqueue_key_t * key,pj_ioqueue_op_key_t * op_key,pj_sock_t new_sock,pj_status_t status)489     static void accept_complete_cb(pj_ioqueue_key_t *key,
490                                    pj_ioqueue_op_key_t *op_key,
491                                    pj_sock_t new_sock,
492                                    pj_status_t status)
493     {
494         Pj_Event_Handler *handler =
495 	    (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
496 
497         handler->on_accept_complete((Pj_Async_Op*)op_key, new_sock, status);
498     }
499 
500     //
501     // Static connect completion callback.
502     //
connect_complete_cb(pj_ioqueue_key_t * key,pj_status_t status)503     static void connect_complete_cb(pj_ioqueue_key_t *key,
504                                     pj_status_t status)
505     {
506         Pj_Event_Handler *handler =
507 	    (Pj_Event_Handler*) pj_ioqueue_get_user_data(key);
508 
509         handler->on_connect_complete(status);
510     }
511 
512 };
513 
514 #endif	/* __PJPP_PROACTOR_HPP__ */
515 
516