1 /*
2  * Copyright 2008-2014 Arsen Chaloyan
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * $Id: apt_poller_task.c 2224 2014-11-12 00:41:45Z achaloyan@gmail.com $
17  */
18 
19 #include "apt_poller_task.h"
20 #include "apt_task.h"
21 #include "apt_pool.h"
22 #include "apt_cyclic_queue.h"
23 #include "apt_log.h"
24 
25 
26 /** Poller task */
27 struct apt_poller_task_t {
28 	apr_pool_t         *pool;
29 	apt_task_t         *base;
30 
31 	void               *obj;
32 	apt_poll_signal_f   signal_handler;
33 
34 	apr_thread_mutex_t *guard;
35 	apt_cyclic_queue_t *msg_queue;
36 	apt_pollset_t      *pollset;
37 	apt_timer_queue_t  *timer_queue;
38 
39 	apr_pollfd_t       *desc_arr;
40 	apr_int32_t         desc_count;
41 	apr_int32_t         desc_index;
42 
43 };
44 
45 static apt_bool_t apt_poller_task_msg_signal(apt_task_t *task, apt_task_msg_t *msg);
46 static apt_bool_t apt_poller_task_run(apt_task_t *task);
47 static apt_bool_t apt_poller_task_on_destroy(apt_task_t *task);
48 
49 
50 /** Create poller task */
apt_poller_task_create(apr_size_t max_pollset_size,apt_poll_signal_f signal_handler,void * obj,apt_task_msg_pool_t * msg_pool,apr_pool_t * pool)51 APT_DECLARE(apt_poller_task_t*) apt_poller_task_create(
52 										apr_size_t max_pollset_size,
53 										apt_poll_signal_f signal_handler,
54 										void *obj,
55 										apt_task_msg_pool_t *msg_pool,
56 										apr_pool_t *pool)
57 {
58 	apt_task_vtable_t *vtable;
59 	apt_poller_task_t *task;
60 
61 	if(!signal_handler) {
62 		return NULL;
63 	}
64 
65 	task = apr_palloc(pool,sizeof(apt_poller_task_t));
66 	task->pool = pool;
67 	task->obj = obj;
68 	task->pollset = NULL;
69 	task->signal_handler = signal_handler;
70 
71 	task->pollset = apt_pollset_create((apr_uint32_t)max_pollset_size,pool);
72 	if(!task->pollset) {
73 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Pollset");
74 		return NULL;
75 	}
76 
77 	task->base = apt_task_create(task,msg_pool,pool);
78 	if(!task->base) {
79 		apt_pollset_destroy(task->pollset);
80 		return NULL;
81 	}
82 
83 	vtable = apt_task_vtable_get(task->base);
84 	if(vtable) {
85 		vtable->run = apt_poller_task_run;
86 		vtable->destroy = apt_poller_task_on_destroy;
87 		vtable->signal_msg = apt_poller_task_msg_signal;
88 	}
89 	apt_task_auto_ready_set(task->base,FALSE);
90 
91 	task->msg_queue = apt_cyclic_queue_create(CYCLIC_QUEUE_DEFAULT_SIZE);
92 	apr_thread_mutex_create(&task->guard,APR_THREAD_MUTEX_UNNESTED,pool);
93 
94 	task->timer_queue = apt_timer_queue_create(pool);
95 	task->desc_arr = NULL;
96 	task->desc_count = 0;
97 	task->desc_index = 0;
98 	return task;
99 }
100 
101 /** Destroy poller task */
apt_poller_task_destroy(apt_poller_task_t * task)102 APT_DECLARE(apt_bool_t) apt_poller_task_destroy(apt_poller_task_t *task)
103 {
104 	return apt_task_destroy(task->base);
105 }
106 
107 /** Cleanup poller task */
apt_poller_task_cleanup(apt_poller_task_t * task)108 APT_DECLARE(void) apt_poller_task_cleanup(apt_poller_task_t *task)
109 {
110 	if(task->pollset) {
111 		apt_pollset_destroy(task->pollset);
112 		task->pollset = NULL;
113 	}
114 	if(task->guard) {
115 		apr_thread_mutex_destroy(task->guard);
116 		task->guard = NULL;
117 	}
118 	if(task->msg_queue) {
119 		apt_cyclic_queue_destroy(task->msg_queue);
120 		task->msg_queue = NULL;
121 	}
122 }
123 
124 /** Virtual destroy handler */
apt_poller_task_on_destroy(apt_task_t * base)125 static apt_bool_t apt_poller_task_on_destroy(apt_task_t *base)
126 {
127 	apt_poller_task_t *task = apt_task_object_get(base);
128 	apt_poller_task_cleanup(task);
129 	return TRUE;
130 }
131 
132 
133 /** Start poller task */
apt_poller_task_start(apt_poller_task_t * task)134 APT_DECLARE(apt_bool_t) apt_poller_task_start(apt_poller_task_t *task)
135 {
136 	return apt_task_start(task->base);
137 }
138 
139 /** Terminate poller task */
apt_poller_task_terminate(apt_poller_task_t * task)140 APT_DECLARE(apt_bool_t) apt_poller_task_terminate(apt_poller_task_t *task)
141 {
142 	return apt_task_terminate(task->base,TRUE);
143 }
144 
145 /** Get task */
apt_poller_task_base_get(const apt_poller_task_t * task)146 APT_DECLARE(apt_task_t*) apt_poller_task_base_get(const apt_poller_task_t *task)
147 {
148 	return task->base;
149 }
150 
151 /** Get task vtable */
apt_poller_task_vtable_get(const apt_poller_task_t * task)152 APT_DECLARE(apt_task_vtable_t*) apt_poller_task_vtable_get(const apt_poller_task_t *task)
153 {
154 	return apt_task_vtable_get(task->base);
155 }
156 
157 /** Get external object */
apt_poller_task_object_get(const apt_poller_task_t * task)158 APT_DECLARE(void*) apt_poller_task_object_get(const apt_poller_task_t *task)
159 {
160 	return task->obj;
161 }
162 
163 /** Add descriptor to pollset */
apt_poller_task_descriptor_add(const apt_poller_task_t * task,const apr_pollfd_t * descriptor)164 APT_DECLARE(apt_bool_t) apt_poller_task_descriptor_add(const apt_poller_task_t *task, const apr_pollfd_t *descriptor)
165 {
166 	if(task->pollset) {
167 		return apt_pollset_add(task->pollset,descriptor);
168 	}
169 	return FALSE;
170 }
171 
172 /** Remove descriptor from pollset */
apt_poller_task_descriptor_remove(const apt_poller_task_t * task,const apr_pollfd_t * descriptor)173 APT_DECLARE(apt_bool_t) apt_poller_task_descriptor_remove(const apt_poller_task_t *task, const apr_pollfd_t *descriptor)
174 {
175 	if(task->pollset) {
176 		apr_int32_t i = task->desc_index + 1;
177 		for(; i < task->desc_count; i++) {
178 			apr_pollfd_t *cur_descriptor = &task->desc_arr[i];
179 			if(cur_descriptor->client_data == descriptor->client_data) {
180 				cur_descriptor->client_data = NULL;
181 			}
182 		}
183 		return apt_pollset_remove(task->pollset,descriptor);
184 	}
185 	return FALSE;
186 }
187 
188 /** Create timer */
apt_poller_task_timer_create(apt_poller_task_t * task,apt_timer_proc_f proc,void * obj,apr_pool_t * pool)189 APT_DECLARE(apt_timer_t*) apt_poller_task_timer_create(
190 									apt_poller_task_t *task,
191 									apt_timer_proc_f proc,
192 									void *obj,
193 									apr_pool_t *pool)
194 {
195 	return apt_timer_create(task->timer_queue,proc,obj,pool);
196 }
197 
apt_poller_task_wakeup_process(apt_poller_task_t * task)198 static apt_bool_t apt_poller_task_wakeup_process(apt_poller_task_t *task)
199 {
200 	apt_bool_t running = TRUE;
201 	apt_task_msg_t *msg;
202 
203 	do {
204 		apr_thread_mutex_lock(task->guard);
205 		msg = apt_cyclic_queue_pop(task->msg_queue);
206 		apr_thread_mutex_unlock(task->guard);
207 		if(msg) {
208 			apt_task_msg_process(task->base,msg);
209 		}
210 		else {
211 			running = FALSE;
212 		}
213 	}
214 	while(running == TRUE);
215 	return TRUE;
216 }
217 
apt_poller_task_run(apt_task_t * base)218 static apt_bool_t apt_poller_task_run(apt_task_t *base)
219 {
220 	apt_poller_task_t *task = apt_task_object_get(base);
221 	apt_bool_t *running;
222 	apr_status_t status;
223 	apr_interval_time_t timeout;
224 	apr_uint32_t queue_timeout;
225 	apr_time_t time_now, time_last = 0;
226 	const char *task_name;
227 
228 	if(!task) {
229 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Start Poller Task");
230 		return FALSE;
231 	}
232 	task_name = apt_task_name_get(task->base);
233 
234 	running = apt_task_running_flag_get(task->base);
235 	if(!running) {
236 		return FALSE;
237 	}
238 
239 	/* explicitly indicate task is ready to process messages */
240 	apt_task_ready(task->base);
241 
242 	while(*running) {
243 		if(apt_timer_queue_timeout_get(task->timer_queue,&queue_timeout) == TRUE) {
244 			timeout = (apr_interval_time_t)queue_timeout * 1000;
245 			time_last = apr_time_now();
246 			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s] timeout [%u]",
247 				task_name, queue_timeout);
248 		}
249 		else {
250 			timeout = -1;
251 			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Wait for Messages [%s]",task_name);
252 		}
253 		status = apt_pollset_poll(task->pollset, timeout, &task->desc_count, (const apr_pollfd_t **) &task->desc_arr);
254 		if(status != APR_SUCCESS && status != APR_TIMEUP) {
255 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Poll [%s] status: %d",task_name,status);
256 			continue;
257 		}
258 		for(task->desc_index = 0; task->desc_index < task->desc_count; task->desc_index++) {
259 			const apr_pollfd_t *descriptor = &task->desc_arr[task->desc_index];
260 			if(apt_pollset_is_wakeup(task->pollset,descriptor)) {
261 				apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Poller Wakeup [%s]",task_name);
262 				apt_poller_task_wakeup_process(task);
263 				if(*running == FALSE) {
264 					break;
265 				}
266 				continue;
267 			}
268 
269 			apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Process Signalled Descriptor [%s]",task_name);
270 			task->signal_handler(task->obj,descriptor);
271 		}
272 
273 		if(timeout != -1) {
274 			time_now = apr_time_now();
275 			if(time_now > time_last) {
276 				apt_timer_queue_advance(task->timer_queue,(apr_uint32_t)((time_now - time_last)/1000));
277 			}
278 		}
279 	}
280 
281 	return TRUE;
282 }
283 
apt_poller_task_msg_signal(apt_task_t * base,apt_task_msg_t * msg)284 static apt_bool_t apt_poller_task_msg_signal(apt_task_t *base, apt_task_msg_t *msg)
285 {
286 	apt_bool_t status;
287 	apt_poller_task_t *task = apt_task_object_get(base);
288 	apr_thread_mutex_lock(task->guard);
289 	status = apt_cyclic_queue_push(task->msg_queue,msg);
290 	apr_thread_mutex_unlock(task->guard);
291 	if(apt_pollset_wakeup(task->pollset) != TRUE) {
292 		apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Signal Control Message");
293 		status = FALSE;
294 	}
295 	return status;
296 }
297