1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: apt_poller_task.c 2224 2014-11-12 00:41:45Z achaloyan@gmail.com $
17 */
18
19 #include "apt_poller_task.h"
20 #include "apt_task.h"
21 #include "apt_pool.h"
22 #include "apt_cyclic_queue.h"
23 #include "apt_log.h"
24
25
26 /** Poller task */
27 struct apt_poller_task_t {
28 apr_pool_t *pool;
29 apt_task_t *base;
30
31 void *obj;
32 apt_poll_signal_f signal_handler;
33
34 apr_thread_mutex_t *guard;
35 apt_cyclic_queue_t *msg_queue;
36 apt_pollset_t *pollset;
37 apt_timer_queue_t *timer_queue;
38
39 apr_pollfd_t *desc_arr;
40 apr_int32_t desc_count;
41 apr_int32_t desc_index;
42
43 };
44
45 static apt_bool_t apt_poller_task_msg_signal(apt_task_t *task, apt_task_msg_t *msg);
46 static apt_bool_t apt_poller_task_run(apt_task_t *task);
47 static apt_bool_t apt_poller_task_on_destroy(apt_task_t *task);
48
49
50 /** Create poller task */
apt_poller_task_create(apr_size_t max_pollset_size,apt_poll_signal_f signal_handler,void * obj,apt_task_msg_pool_t * msg_pool,apr_pool_t * pool)51 APT_DECLARE(apt_poller_task_t*) apt_poller_task_create(
52 apr_size_t max_pollset_size,
53 apt_poll_signal_f signal_handler,
54 void *obj,
55 apt_task_msg_pool_t *msg_pool,
56 apr_pool_t *pool)
57 {
58 apt_task_vtable_t *vtable;
59 apt_poller_task_t *task;
60
61 if(!signal_handler) {
62 return NULL;
63 }
64
65 task = apr_palloc(pool,sizeof(apt_poller_task_t));
66 task->pool = pool;
67 task->obj = obj;
68 task->pollset = NULL;
69 task->signal_handler = signal_handler;
70
71 task->pollset = apt_pollset_create((apr_uint32_t)max_pollset_size,pool);
72 if(!task->pollset) {
73 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Pollset");
74 return NULL;
75 }
76
77 task->base = apt_task_create(task,msg_pool,pool);
78 if(!task->base) {
79 apt_pollset_destroy(task->pollset);
80 return NULL;
81 }
82
83 vtable = apt_task_vtable_get(task->base);
84 if(vtable) {
85 vtable->run = apt_poller_task_run;
86 vtable->destroy = apt_poller_task_on_destroy;
87 vtable->signal_msg = apt_poller_task_msg_signal;
88 }
89 apt_task_auto_ready_set(task->base,FALSE);
90
91 task->msg_queue = apt_cyclic_queue_create(CYCLIC_QUEUE_DEFAULT_SIZE);
92 apr_thread_mutex_create(&task->guard,APR_THREAD_MUTEX_UNNESTED,pool);
93
94 task->timer_queue = apt_timer_queue_create(pool);
95 task->desc_arr = NULL;
96 task->desc_count = 0;
97 task->desc_index = 0;
98 return task;
99 }
100
101 /** Destroy poller task */
apt_poller_task_destroy(apt_poller_task_t * task)102 APT_DECLARE(apt_bool_t) apt_poller_task_destroy(apt_poller_task_t *task)
103 {
104 return apt_task_destroy(task->base);
105 }
106
107 /** Cleanup poller task */
apt_poller_task_cleanup(apt_poller_task_t * task)108 APT_DECLARE(void) apt_poller_task_cleanup(apt_poller_task_t *task)
109 {
110 if(task->pollset) {
111 apt_pollset_destroy(task->pollset);
112 task->pollset = NULL;
113 }
114 if(task->guard) {
115 apr_thread_mutex_destroy(task->guard);
116 task->guard = NULL;
117 }
118 if(task->msg_queue) {
119 apt_cyclic_queue_destroy(task->msg_queue);
120 task->msg_queue = NULL;
121 }
122 }
123
124 /** Virtual destroy handler */
apt_poller_task_on_destroy(apt_task_t * base)125 static apt_bool_t apt_poller_task_on_destroy(apt_task_t *base)
126 {
127 apt_poller_task_t *task = apt_task_object_get(base);
128 apt_poller_task_cleanup(task);
129 return TRUE;
130 }
131
132
133 /** Start poller task */
apt_poller_task_start(apt_poller_task_t * task)134 APT_DECLARE(apt_bool_t) apt_poller_task_start(apt_poller_task_t *task)
135 {
136 return apt_task_start(task->base);
137 }
138
139 /** Terminate poller task */
apt_poller_task_terminate(apt_poller_task_t * task)140 APT_DECLARE(apt_bool_t) apt_poller_task_terminate(apt_poller_task_t *task)
141 {
142 return apt_task_terminate(task->base,TRUE);
143 }
144
145 /** Get task */
apt_poller_task_base_get(const apt_poller_task_t * task)146 APT_DECLARE(apt_task_t*) apt_poller_task_base_get(const apt_poller_task_t *task)
147 {
148 return task->base;
149 }
150
151 /** Get task vtable */
apt_poller_task_vtable_get(const apt_poller_task_t * task)152 APT_DECLARE(apt_task_vtable_t*) apt_poller_task_vtable_get(const apt_poller_task_t *task)
153 {
154 return apt_task_vtable_get(task->base);
155 }
156
157 /** Get external object */
apt_poller_task_object_get(const apt_poller_task_t * task)158 APT_DECLARE(void*) apt_poller_task_object_get(const apt_poller_task_t *task)
159 {
160 return task->obj;
161 }
162
163 /** Add descriptor to pollset */
apt_poller_task_descriptor_add(const apt_poller_task_t * task,const apr_pollfd_t * descriptor)164 APT_DECLARE(apt_bool_t) apt_poller_task_descriptor_add(const apt_poller_task_t *task, const apr_pollfd_t *descriptor)
165 {
166 if(task->pollset) {
167 return apt_pollset_add(task->pollset,descriptor);
168 }
169 return FALSE;
170 }
171
172 /** Remove descriptor from pollset */
apt_poller_task_descriptor_remove(const apt_poller_task_t * task,const apr_pollfd_t * descriptor)173 APT_DECLARE(apt_bool_t) apt_poller_task_descriptor_remove(const apt_poller_task_t *task, const apr_pollfd_t *descriptor)
174 {
175 if(task->pollset) {
176 apr_int32_t i = task->desc_index + 1;
177 for(; i < task->desc_count; i++) {
178 apr_pollfd_t *cur_descriptor = &task->desc_arr[i];
179 if(cur_descriptor->client_data == descriptor->client_data) {
180 cur_descriptor->client_data = NULL;
181 }
182 }
183 return apt_pollset_remove(task->pollset,descriptor);
184 }
185 return FALSE;
186 }
187
188 /** Create timer */
apt_poller_task_timer_create(apt_poller_task_t * task,apt_timer_proc_f proc,void * obj,apr_pool_t * pool)189 APT_DECLARE(apt_timer_t*) apt_poller_task_timer_create(
190 apt_poller_task_t *task,
191 apt_timer_proc_f proc,
192 void *obj,
193 apr_pool_t *pool)
194 {
195 return apt_timer_create(task->timer_queue,proc,obj,pool);
196 }
197
apt_poller_task_wakeup_process(apt_poller_task_t * task)198 static apt_bool_t apt_poller_task_wakeup_process(apt_poller_task_t *task)
199 {
200 apt_bool_t running = TRUE;
201 apt_task_msg_t *msg;
202
203 do {
204 apr_thread_mutex_lock(task->guard);
205 msg = apt_cyclic_queue_pop(task->msg_queue);
206 apr_thread_mutex_unlock(task->guard);
207 if(msg) {
208 apt_task_msg_process(task->base,msg);
209 }
210 else {
211 running = FALSE;
212 }
213 }
214 while(running == TRUE);
215 return TRUE;
216 }
217
apt_poller_task_run(apt_task_t * base)218 static apt_bool_t apt_poller_task_run(apt_task_t *base)
219 {
220 apt_poller_task_t *task = apt_task_object_get(base);
221 apt_bool_t *running;
222 apr_status_t status;
223 apr_interval_time_t timeout;
224 apr_uint32_t queue_timeout;
225 apr_time_t time_now, time_last = 0;
226 const char *task_name;
227
228 if(!task) {
229 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Start Poller Task");
230 return FALSE;
231 }
232 task_name = apt_task_name_get(task->base);
233
234 running = apt_task_running_flag_get(task->base);
235 if(!running) {
236 return FALSE;
237 }
238
239 /* explicitly indicate task is ready to process messages */
240 apt_task_ready(task->base);
241
242 while(*running) {
243 if(apt_timer_queue_timeout_get(task->timer_queue,&queue_timeout) == TRUE) {
244 timeout = (apr_interval_time_t)queue_timeout * 1000;
245 time_last = apr_time_now();
246 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s] timeout [%u]",
247 task_name, queue_timeout);
248 }
249 else {
250 timeout = -1;
251 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
252 }
253 status = apt_pollset_poll(task->pollset, timeout, &task->desc_count, (const apr_pollfd_t **) &task->desc_arr);
254 if(status != APR_SUCCESS && status != APR_TIMEUP) {
255 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Poll [%s] status: %d",task_name,status);
256 continue;
257 }
258 for(task->desc_index = 0; task->desc_index < task->desc_count; task->desc_index++) {
259 const apr_pollfd_t *descriptor = &task->desc_arr[task->desc_index];
260 if(apt_pollset_is_wakeup(task->pollset,descriptor)) {
261 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Poller Wakeup [%s]",task_name);
262 apt_poller_task_wakeup_process(task);
263 if(*running == FALSE) {
264 break;
265 }
266 continue;
267 }
268
269 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Signalled Descriptor [%s]",task_name);
270 task->signal_handler(task->obj,descriptor);
271 }
272
273 if(timeout != -1) {
274 time_now = apr_time_now();
275 if(time_now > time_last) {
276 apt_timer_queue_advance(task->timer_queue,(apr_uint32_t)((time_now - time_last)/1000));
277 }
278 }
279 }
280
281 return TRUE;
282 }
283
apt_poller_task_msg_signal(apt_task_t * base,apt_task_msg_t * msg)284 static apt_bool_t apt_poller_task_msg_signal(apt_task_t *base, apt_task_msg_t *msg)
285 {
286 apt_bool_t status;
287 apt_poller_task_t *task = apt_task_object_get(base);
288 apr_thread_mutex_lock(task->guard);
289 status = apt_cyclic_queue_push(task->msg_queue,msg);
290 apr_thread_mutex_unlock(task->guard);
291 if(apt_pollset_wakeup(task->pollset) != TRUE) {
292 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Signal Control Message");
293 status = FALSE;
294 }
295 return status;
296 }
297