1 /*
2  * Copyright 2008-2014 Arsen Chaloyan
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * $Id: mpf_engine.c 2226 2014-11-12 00:47:40Z achaloyan@gmail.com $
17  */
18 
19 #include "mpf_engine.h"
20 #include "mpf_context.h"
21 #include "mpf_termination.h"
22 #include "mpf_stream.h"
23 #include "mpf_scheduler.h"
24 #include "mpf_codec_descriptor.h"
25 #include "mpf_codec_manager.h"
26 #include "apt_obj_list.h"
27 #include "apt_cyclic_queue.h"
28 #include "apt_log.h"
29 
30 #define MPF_TIMER_RESOLUTION 100 /* 100 ms */
31 
32 struct mpf_engine_t {
33 	apr_pool_t                *pool;
34 	apt_task_t                *task;
35 	apt_task_msg_type_e        task_msg_type;
36 	apr_thread_mutex_t        *request_queue_guard;
37 	apt_cyclic_queue_t        *request_queue;
38 	mpf_context_factory_t     *context_factory;
39 	mpf_scheduler_t           *scheduler;
40 	apt_timer_queue_t         *timer_queue;
41 	const mpf_codec_manager_t *codec_manager;
42 };
43 
44 static void mpf_engine_main(mpf_scheduler_t *scheduler, void *obj);
45 static void mpf_engine_timer_proc(mpf_scheduler_t *scheduler, void *obj);
46 static apt_bool_t mpf_engine_destroy(apt_task_t *task);
47 static apt_bool_t mpf_engine_start(apt_task_t *task);
48 static apt_bool_t mpf_engine_terminate(apt_task_t *task);
49 static apt_bool_t mpf_engine_msg_signal(apt_task_t *task, apt_task_msg_t *msg);
50 static apt_bool_t mpf_engine_msg_process(apt_task_t *task, apt_task_msg_t *msg);
51 
52 
53 mpf_codec_t* mpf_codec_l16_create(apr_pool_t *pool);
54 mpf_codec_t* mpf_codec_g711u_create(apr_pool_t *pool);
55 mpf_codec_t* mpf_codec_g711a_create(apr_pool_t *pool);
56 
mpf_engine_create(const char * id,apr_pool_t * pool)57 MPF_DECLARE(mpf_engine_t*) mpf_engine_create(const char *id, apr_pool_t *pool)
58 {
59 	apt_task_vtable_t *vtable;
60 	apt_task_msg_pool_t *msg_pool;
61 	mpf_engine_t *engine = apr_palloc(pool,sizeof(mpf_engine_t));
62 	engine->pool = pool;
63 	engine->request_queue = NULL;
64 	engine->context_factory = NULL;
65 	engine->codec_manager = NULL;
66 
67 	msg_pool = apt_task_msg_pool_create_dynamic(sizeof(mpf_message_container_t),pool);
68 
69 	apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create Media Engine [%s]",id);
70 	engine->task = apt_task_create(engine,msg_pool,pool);
71 	if(!engine->task) {
72 		return NULL;
73 	}
74 
75 	apt_task_name_set(engine->task,id);
76 
77 	vtable = apt_task_vtable_get(engine->task);
78 	if(vtable) {
79 		vtable->destroy = mpf_engine_destroy;
80 		vtable->start = mpf_engine_start;
81 		vtable->terminate = mpf_engine_terminate;
82 		vtable->signal_msg = mpf_engine_msg_signal;
83 		vtable->process_msg = mpf_engine_msg_process;
84 	}
85 
86 	engine->task_msg_type = TASK_MSG_USER;
87 
88 	engine->context_factory = mpf_context_factory_create(engine->pool);
89 	engine->request_queue = apt_cyclic_queue_create(CYCLIC_QUEUE_DEFAULT_SIZE);
90 	apr_thread_mutex_create(&engine->request_queue_guard,APR_THREAD_MUTEX_UNNESTED,engine->pool);
91 
92 	engine->scheduler = mpf_scheduler_create(engine->pool);
93 	mpf_scheduler_media_clock_set(engine->scheduler,CODEC_FRAME_TIME_BASE,mpf_engine_main,engine);
94 
95 	engine->timer_queue = apt_timer_queue_create(engine->pool);
96 	mpf_scheduler_timer_clock_set(engine->scheduler,MPF_TIMER_RESOLUTION,mpf_engine_timer_proc,engine);
97 	return engine;
98 }
99 
mpf_engine_context_create(mpf_engine_t * engine,const char * name,void * obj,apr_size_t max_termination_count,apr_pool_t * pool)100 MPF_DECLARE(mpf_context_t*) mpf_engine_context_create(
101 								mpf_engine_t *engine,
102 								const char *name,
103 								void *obj,
104 								apr_size_t max_termination_count,
105 								apr_pool_t *pool)
106 {
107 	return mpf_context_create(engine->context_factory,name,obj,max_termination_count,pool);
108 }
109 
mpf_engine_context_destroy(mpf_context_t * context)110 MPF_DECLARE(apt_bool_t) mpf_engine_context_destroy(mpf_context_t *context)
111 {
112 	return mpf_context_destroy(context);
113 }
114 
mpf_engine_context_object_get(const mpf_context_t * context)115 MPF_DECLARE(void*) mpf_engine_context_object_get(const mpf_context_t *context)
116 {
117 	return mpf_context_object_get(context);
118 }
119 
mpf_task_get(const mpf_engine_t * engine)120 MPF_DECLARE(apt_task_t*) mpf_task_get(const mpf_engine_t *engine)
121 {
122 	return engine->task;
123 }
124 
mpf_engine_task_msg_type_set(mpf_engine_t * engine,apt_task_msg_type_e type)125 MPF_DECLARE(void) mpf_engine_task_msg_type_set(mpf_engine_t *engine, apt_task_msg_type_e type)
126 {
127 	engine->task_msg_type = type;
128 }
129 
mpf_engine_message_get(mpf_engine_t * engine,mpf_task_msg_t ** task_msg)130 static mpf_message_t* mpf_engine_message_get(mpf_engine_t *engine, mpf_task_msg_t **task_msg)
131 {
132 	mpf_message_container_t *container;
133 	mpf_message_t *mpf_message;
134 	if(*task_msg) {
135 		container = (mpf_message_container_t*) (*task_msg)->data;
136 		if(container->count >= MAX_MPF_MESSAGE_COUNT) {
137 			/* container has been already filled,
138 			implicitly send the requests and get new task message */
139 			mpf_engine_message_send(engine,task_msg);
140 			return mpf_engine_message_get(engine,task_msg);
141 		}
142 	}
143 	else {
144 		*task_msg = apt_task_msg_get(engine->task);
145 		container = (mpf_message_container_t*) (*task_msg)->data;
146 		container->count = 0;
147 	}
148 
149 	mpf_message = &container->messages[container->count];
150 	container->count++;
151 	return mpf_message;
152 }
153 
154 
mpf_engine_termination_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_termination_t * termination,void * descriptor,mpf_task_msg_t ** task_msg)155 MPF_DECLARE(apt_bool_t) mpf_engine_termination_message_add(
156 							mpf_engine_t *engine,
157 							mpf_command_type_e command_id,
158 							mpf_context_t *context,
159 							mpf_termination_t *termination,
160 							void *descriptor,
161 							mpf_task_msg_t **task_msg)
162 {
163 	mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
164 	if(!mpf_message) {
165 		return FALSE;
166 	}
167 	mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
168 	mpf_message->command_id = command_id;
169 	mpf_message->context = context;
170 	mpf_message->termination = termination;
171 	mpf_message->assoc_termination = NULL;
172 	mpf_message->descriptor = descriptor;
173 	return TRUE;
174 }
175 
mpf_engine_assoc_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_termination_t * termination,mpf_termination_t * assoc_termination,mpf_task_msg_t ** task_msg)176 MPF_DECLARE(apt_bool_t) mpf_engine_assoc_message_add(
177 							mpf_engine_t *engine,
178 							mpf_command_type_e command_id,
179 							mpf_context_t *context,
180 							mpf_termination_t *termination,
181 							mpf_termination_t *assoc_termination,
182 							mpf_task_msg_t **task_msg)
183 {
184 	mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
185 	if(!mpf_message) {
186 		return FALSE;
187 	}
188 	mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
189 	mpf_message->command_id = command_id;
190 	mpf_message->context = context;
191 	mpf_message->termination = termination;
192 	mpf_message->assoc_termination = assoc_termination;
193 	mpf_message->descriptor = NULL;
194 	return TRUE;
195 }
196 
mpf_engine_topology_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_task_msg_t ** task_msg)197 MPF_DECLARE(apt_bool_t) mpf_engine_topology_message_add(
198 							mpf_engine_t *engine,
199 							mpf_command_type_e command_id,
200 							mpf_context_t *context,
201 							mpf_task_msg_t **task_msg)
202 {
203 	mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
204 	if(!mpf_message) {
205 		return FALSE;
206 	}
207 	mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
208 	mpf_message->command_id = command_id;
209 	mpf_message->context = context;
210 	mpf_message->termination = NULL;
211 	mpf_message->assoc_termination = NULL;
212 	mpf_message->descriptor = NULL;
213 	return TRUE;
214 }
215 
mpf_engine_message_send(mpf_engine_t * engine,mpf_task_msg_t ** task_msg)216 MPF_DECLARE(apt_bool_t) mpf_engine_message_send(mpf_engine_t *engine, mpf_task_msg_t **task_msg)
217 {
218 	apt_bool_t status = FALSE;
219 	if(*task_msg) {
220 		status = apt_task_msg_signal(engine->task,*task_msg);
221 		*task_msg = NULL;
222 	}
223 	return status;
224 }
225 
mpf_engine_destroy(apt_task_t * task)226 static apt_bool_t mpf_engine_destroy(apt_task_t *task)
227 {
228 	mpf_engine_t *engine = apt_task_object_get(task);
229 
230 	apt_timer_queue_destroy(engine->timer_queue);
231 	mpf_scheduler_destroy(engine->scheduler);
232 	mpf_context_factory_destroy(engine->context_factory);
233 	apt_cyclic_queue_destroy(engine->request_queue);
234 	apr_thread_mutex_destroy(engine->request_queue_guard);
235 	return TRUE;
236 }
237 
mpf_engine_start(apt_task_t * task)238 static apt_bool_t mpf_engine_start(apt_task_t *task)
239 {
240 	mpf_engine_t *engine = apt_task_object_get(task);
241 
242 	mpf_scheduler_start(engine->scheduler);
243 	apt_task_start_request_process(task);
244 	return TRUE;
245 }
246 
mpf_engine_terminate(apt_task_t * task)247 static apt_bool_t mpf_engine_terminate(apt_task_t *task)
248 {
249 	mpf_engine_t *engine = apt_task_object_get(task);
250 
251 	mpf_scheduler_stop(engine->scheduler);
252 	apt_task_terminate_request_process(task);
253 	return TRUE;
254 }
255 
mpf_engine_event_raise(mpf_termination_t * termination,int event_id,void * descriptor)256 static apt_bool_t mpf_engine_event_raise(mpf_termination_t *termination, int event_id, void *descriptor)
257 {
258 	apt_task_msg_t *task_msg;
259 	mpf_message_container_t *event_msg;
260 	mpf_message_t *mpf_message;
261 	mpf_engine_t *engine;
262 	engine = termination->media_engine;
263 	if(!engine) {
264 		return FALSE;
265 	}
266 
267 	task_msg = apt_task_msg_get(engine->task);
268 	if(!task_msg) {
269 		return FALSE;
270 	}
271 	task_msg->type = engine->task_msg_type;
272 	event_msg = (mpf_message_container_t*) task_msg->data;
273 	mpf_message = event_msg->messages;
274 	event_msg->count = 1;
275 
276 	mpf_message->command_id = event_id;
277 	mpf_message->message_type = MPF_MESSAGE_TYPE_EVENT;
278 	mpf_message->status_code = MPF_STATUS_CODE_SUCCESS;
279 	mpf_message->context = NULL;
280 	mpf_message->termination = termination;
281 	mpf_message->descriptor = descriptor;
282 
283 	return apt_task_msg_parent_signal(engine->task,task_msg);
284 }
285 
mpf_engine_msg_signal(apt_task_t * task,apt_task_msg_t * msg)286 static apt_bool_t mpf_engine_msg_signal(apt_task_t *task, apt_task_msg_t *msg)
287 {
288 	mpf_engine_t *engine = apt_task_object_get(task);
289 
290 	apr_thread_mutex_lock(engine->request_queue_guard);
291 	if(apt_cyclic_queue_push(engine->request_queue,msg) == FALSE) {
292 		apt_log(APT_LOG_MARK,APT_PRIO_ERROR,"MPF Request Queue is Full [%s]",apt_task_name_get(task));
293 	}
294 	apr_thread_mutex_unlock(engine->request_queue_guard);
295 	return TRUE;
296 }
297 
mpf_engine_msg_process(apt_task_t * task,apt_task_msg_t * msg)298 static apt_bool_t mpf_engine_msg_process(apt_task_t *task, apt_task_msg_t *msg)
299 {
300 	apr_size_t i;
301 	mpf_engine_t *engine = apt_task_object_get(task);
302 	apt_task_msg_t *response_msg;
303 	mpf_message_container_t *response;
304 	mpf_message_t *mpf_response;
305 	mpf_context_t *context;
306 	mpf_termination_t *termination;
307 	const mpf_message_t *mpf_request;
308 	const mpf_message_container_t *request = (const mpf_message_container_t*) msg->data;
309 
310 	response_msg = apt_task_msg_get(engine->task);
311 	if(!response_msg) {
312 		return FALSE;
313 	}
314 	response_msg->type = engine->task_msg_type;
315 	response = (mpf_message_container_t*) response_msg->data;
316 	*response = *request;
317 	for(i=0; i<request->count; i++) {
318 		mpf_request = &request->messages[i];
319 		mpf_response = &response->messages[i];
320 
321 		if(mpf_request->message_type != MPF_MESSAGE_TYPE_REQUEST) {
322 			apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Invalid MPF Message Type [%d]",mpf_request->message_type);
323 			continue;
324 		}
325 
326 		mpf_response->message_type = MPF_MESSAGE_TYPE_RESPONSE;
327 		mpf_response->status_code = MPF_STATUS_CODE_SUCCESS;
328 		context = mpf_request->context;
329 		termination = mpf_request->termination;
330 		switch(mpf_request->command_id) {
331 			case MPF_ADD_TERMINATION:
332 			{
333 				termination->media_engine = engine;
334 				termination->event_handler = mpf_engine_event_raise;
335 				termination->codec_manager = engine->codec_manager;
336 				termination->timer_queue = engine->timer_queue;
337 
338 				mpf_termination_add(termination,mpf_request->descriptor);
339 				if(mpf_context_termination_add(context,termination) == FALSE) {
340 					mpf_termination_subtract(termination);
341 					mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
342 					break;
343 				}
344 				break;
345 			}
346 			case MPF_MODIFY_TERMINATION:
347 			{
348 				mpf_termination_modify(termination,mpf_request->descriptor);
349 				break;
350 			}
351 			case MPF_SUBTRACT_TERMINATION:
352 			{
353 				if(mpf_context_termination_subtract(context,termination) == FALSE) {
354 					mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
355 					break;
356 				}
357 				mpf_termination_subtract(termination);
358 				break;
359 			}
360 			case MPF_ADD_ASSOCIATION:
361 			{
362 				mpf_context_association_add(context,termination,mpf_request->assoc_termination);
363 				break;
364 			}
365 			case MPF_REMOVE_ASSOCIATION:
366 			{
367 				mpf_context_association_remove(context,termination,mpf_request->assoc_termination);
368 				break;
369 			}
370 			case MPF_RESET_ASSOCIATIONS:
371 			{
372 				mpf_context_associations_reset(context);
373 				break;
374 			}
375 			case MPF_APPLY_TOPOLOGY:
376 			{
377 				mpf_context_topology_apply(context);
378 				break;
379 			}
380 			case MPF_DESTROY_TOPOLOGY:
381 			{
382 				mpf_context_topology_destroy(context);
383 				break;
384 			}
385 			default:
386 			{
387 				mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
388 			}
389 		}
390 	}
391 
392 	return apt_task_msg_parent_signal(engine->task,response_msg);
393 }
394 
mpf_engine_main(mpf_scheduler_t * scheduler,void * obj)395 static void mpf_engine_main(mpf_scheduler_t *scheduler, void *obj)
396 {
397 	mpf_engine_t *engine = obj;
398 	apt_task_msg_t *msg;
399 
400 	/* process request queue */
401 	apr_thread_mutex_lock(engine->request_queue_guard);
402 	msg = apt_cyclic_queue_pop(engine->request_queue);
403 	while(msg) {
404 		apr_thread_mutex_unlock(engine->request_queue_guard);
405 		apt_task_msg_process(engine->task,msg);
406 		apr_thread_mutex_lock(engine->request_queue_guard);
407 		msg = apt_cyclic_queue_pop(engine->request_queue);
408 	}
409 	apr_thread_mutex_unlock(engine->request_queue_guard);
410 
411 	/* process factory of media contexts */
412 	mpf_context_factory_process(engine->context_factory);
413 }
414 
mpf_engine_timer_proc(mpf_scheduler_t * scheduler,void * obj)415 static void mpf_engine_timer_proc(mpf_scheduler_t *scheduler, void *obj)
416 {
417 	mpf_engine_t *engine = obj;
418 	apt_timer_queue_advance(engine->timer_queue,MPF_TIMER_RESOLUTION);
419 }
420 
mpf_engine_codec_manager_create(apr_pool_t * pool)421 MPF_DECLARE(mpf_codec_manager_t*) mpf_engine_codec_manager_create(apr_pool_t *pool)
422 {
423 	mpf_codec_manager_t *codec_manager = mpf_codec_manager_create(4,pool);
424 	if(codec_manager) {
425 		mpf_codec_t *codec;
426 
427 		codec = mpf_codec_g711u_create(pool);
428 		mpf_codec_manager_codec_register(codec_manager,codec);
429 
430 		codec = mpf_codec_g711a_create(pool);
431 		mpf_codec_manager_codec_register(codec_manager,codec);
432 
433 		codec = mpf_codec_l16_create(pool);
434 		mpf_codec_manager_codec_register(codec_manager,codec);
435 	}
436 	return codec_manager;
437 }
438 
mpf_engine_codec_manager_register(mpf_engine_t * engine,const mpf_codec_manager_t * codec_manager)439 MPF_DECLARE(apt_bool_t) mpf_engine_codec_manager_register(mpf_engine_t *engine, const mpf_codec_manager_t *codec_manager)
440 {
441 	engine->codec_manager = codec_manager;
442 	return TRUE;
443 }
444 
mpf_engine_scheduler_rate_set(mpf_engine_t * engine,unsigned long rate)445 MPF_DECLARE(apt_bool_t) mpf_engine_scheduler_rate_set(mpf_engine_t *engine, unsigned long rate)
446 {
447 	return mpf_scheduler_rate_set(engine->scheduler,rate);
448 }
449 
mpf_engine_id_get(const mpf_engine_t * engine)450 MPF_DECLARE(const char*) mpf_engine_id_get(const mpf_engine_t *engine)
451 {
452 	return apt_task_name_get(engine->task);
453 }
454