1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: mpf_engine.c 2226 2014-11-12 00:47:40Z achaloyan@gmail.com $
17 */
18
19 #include "mpf_engine.h"
20 #include "mpf_context.h"
21 #include "mpf_termination.h"
22 #include "mpf_stream.h"
23 #include "mpf_scheduler.h"
24 #include "mpf_codec_descriptor.h"
25 #include "mpf_codec_manager.h"
26 #include "apt_obj_list.h"
27 #include "apt_cyclic_queue.h"
28 #include "apt_log.h"
29
30 #define MPF_TIMER_RESOLUTION 100 /* 100 ms */
31
32 struct mpf_engine_t {
33 apr_pool_t *pool;
34 apt_task_t *task;
35 apt_task_msg_type_e task_msg_type;
36 apr_thread_mutex_t *request_queue_guard;
37 apt_cyclic_queue_t *request_queue;
38 mpf_context_factory_t *context_factory;
39 mpf_scheduler_t *scheduler;
40 apt_timer_queue_t *timer_queue;
41 const mpf_codec_manager_t *codec_manager;
42 };
43
44 static void mpf_engine_main(mpf_scheduler_t *scheduler, void *obj);
45 static void mpf_engine_timer_proc(mpf_scheduler_t *scheduler, void *obj);
46 static apt_bool_t mpf_engine_destroy(apt_task_t *task);
47 static apt_bool_t mpf_engine_start(apt_task_t *task);
48 static apt_bool_t mpf_engine_terminate(apt_task_t *task);
49 static apt_bool_t mpf_engine_msg_signal(apt_task_t *task, apt_task_msg_t *msg);
50 static apt_bool_t mpf_engine_msg_process(apt_task_t *task, apt_task_msg_t *msg);
51
52
53 mpf_codec_t* mpf_codec_l16_create(apr_pool_t *pool);
54 mpf_codec_t* mpf_codec_g711u_create(apr_pool_t *pool);
55 mpf_codec_t* mpf_codec_g711a_create(apr_pool_t *pool);
56
mpf_engine_create(const char * id,apr_pool_t * pool)57 MPF_DECLARE(mpf_engine_t*) mpf_engine_create(const char *id, apr_pool_t *pool)
58 {
59 apt_task_vtable_t *vtable;
60 apt_task_msg_pool_t *msg_pool;
61 mpf_engine_t *engine = apr_palloc(pool,sizeof(mpf_engine_t));
62 engine->pool = pool;
63 engine->request_queue = NULL;
64 engine->context_factory = NULL;
65 engine->codec_manager = NULL;
66
67 msg_pool = apt_task_msg_pool_create_dynamic(sizeof(mpf_message_container_t),pool);
68
69 apt_log(APT_LOG_MARK,APT_PRIO_NOTICE,"Create Media Engine [%s]",id);
70 engine->task = apt_task_create(engine,msg_pool,pool);
71 if(!engine->task) {
72 return NULL;
73 }
74
75 apt_task_name_set(engine->task,id);
76
77 vtable = apt_task_vtable_get(engine->task);
78 if(vtable) {
79 vtable->destroy = mpf_engine_destroy;
80 vtable->start = mpf_engine_start;
81 vtable->terminate = mpf_engine_terminate;
82 vtable->signal_msg = mpf_engine_msg_signal;
83 vtable->process_msg = mpf_engine_msg_process;
84 }
85
86 engine->task_msg_type = TASK_MSG_USER;
87
88 engine->context_factory = mpf_context_factory_create(engine->pool);
89 engine->request_queue = apt_cyclic_queue_create(CYCLIC_QUEUE_DEFAULT_SIZE);
90 apr_thread_mutex_create(&engine->request_queue_guard,APR_THREAD_MUTEX_UNNESTED,engine->pool);
91
92 engine->scheduler = mpf_scheduler_create(engine->pool);
93 mpf_scheduler_media_clock_set(engine->scheduler,CODEC_FRAME_TIME_BASE,mpf_engine_main,engine);
94
95 engine->timer_queue = apt_timer_queue_create(engine->pool);
96 mpf_scheduler_timer_clock_set(engine->scheduler,MPF_TIMER_RESOLUTION,mpf_engine_timer_proc,engine);
97 return engine;
98 }
99
mpf_engine_context_create(mpf_engine_t * engine,const char * name,void * obj,apr_size_t max_termination_count,apr_pool_t * pool)100 MPF_DECLARE(mpf_context_t*) mpf_engine_context_create(
101 mpf_engine_t *engine,
102 const char *name,
103 void *obj,
104 apr_size_t max_termination_count,
105 apr_pool_t *pool)
106 {
107 return mpf_context_create(engine->context_factory,name,obj,max_termination_count,pool);
108 }
109
mpf_engine_context_destroy(mpf_context_t * context)110 MPF_DECLARE(apt_bool_t) mpf_engine_context_destroy(mpf_context_t *context)
111 {
112 return mpf_context_destroy(context);
113 }
114
mpf_engine_context_object_get(const mpf_context_t * context)115 MPF_DECLARE(void*) mpf_engine_context_object_get(const mpf_context_t *context)
116 {
117 return mpf_context_object_get(context);
118 }
119
mpf_task_get(const mpf_engine_t * engine)120 MPF_DECLARE(apt_task_t*) mpf_task_get(const mpf_engine_t *engine)
121 {
122 return engine->task;
123 }
124
mpf_engine_task_msg_type_set(mpf_engine_t * engine,apt_task_msg_type_e type)125 MPF_DECLARE(void) mpf_engine_task_msg_type_set(mpf_engine_t *engine, apt_task_msg_type_e type)
126 {
127 engine->task_msg_type = type;
128 }
129
mpf_engine_message_get(mpf_engine_t * engine,mpf_task_msg_t ** task_msg)130 static mpf_message_t* mpf_engine_message_get(mpf_engine_t *engine, mpf_task_msg_t **task_msg)
131 {
132 mpf_message_container_t *container;
133 mpf_message_t *mpf_message;
134 if(*task_msg) {
135 container = (mpf_message_container_t*) (*task_msg)->data;
136 if(container->count >= MAX_MPF_MESSAGE_COUNT) {
137 /* container has been already filled,
138 implicitly send the requests and get new task message */
139 mpf_engine_message_send(engine,task_msg);
140 return mpf_engine_message_get(engine,task_msg);
141 }
142 }
143 else {
144 *task_msg = apt_task_msg_get(engine->task);
145 container = (mpf_message_container_t*) (*task_msg)->data;
146 container->count = 0;
147 }
148
149 mpf_message = &container->messages[container->count];
150 container->count++;
151 return mpf_message;
152 }
153
154
mpf_engine_termination_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_termination_t * termination,void * descriptor,mpf_task_msg_t ** task_msg)155 MPF_DECLARE(apt_bool_t) mpf_engine_termination_message_add(
156 mpf_engine_t *engine,
157 mpf_command_type_e command_id,
158 mpf_context_t *context,
159 mpf_termination_t *termination,
160 void *descriptor,
161 mpf_task_msg_t **task_msg)
162 {
163 mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
164 if(!mpf_message) {
165 return FALSE;
166 }
167 mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
168 mpf_message->command_id = command_id;
169 mpf_message->context = context;
170 mpf_message->termination = termination;
171 mpf_message->assoc_termination = NULL;
172 mpf_message->descriptor = descriptor;
173 return TRUE;
174 }
175
mpf_engine_assoc_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_termination_t * termination,mpf_termination_t * assoc_termination,mpf_task_msg_t ** task_msg)176 MPF_DECLARE(apt_bool_t) mpf_engine_assoc_message_add(
177 mpf_engine_t *engine,
178 mpf_command_type_e command_id,
179 mpf_context_t *context,
180 mpf_termination_t *termination,
181 mpf_termination_t *assoc_termination,
182 mpf_task_msg_t **task_msg)
183 {
184 mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
185 if(!mpf_message) {
186 return FALSE;
187 }
188 mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
189 mpf_message->command_id = command_id;
190 mpf_message->context = context;
191 mpf_message->termination = termination;
192 mpf_message->assoc_termination = assoc_termination;
193 mpf_message->descriptor = NULL;
194 return TRUE;
195 }
196
mpf_engine_topology_message_add(mpf_engine_t * engine,mpf_command_type_e command_id,mpf_context_t * context,mpf_task_msg_t ** task_msg)197 MPF_DECLARE(apt_bool_t) mpf_engine_topology_message_add(
198 mpf_engine_t *engine,
199 mpf_command_type_e command_id,
200 mpf_context_t *context,
201 mpf_task_msg_t **task_msg)
202 {
203 mpf_message_t *mpf_message = mpf_engine_message_get(engine,task_msg);
204 if(!mpf_message) {
205 return FALSE;
206 }
207 mpf_message->message_type = MPF_MESSAGE_TYPE_REQUEST;
208 mpf_message->command_id = command_id;
209 mpf_message->context = context;
210 mpf_message->termination = NULL;
211 mpf_message->assoc_termination = NULL;
212 mpf_message->descriptor = NULL;
213 return TRUE;
214 }
215
mpf_engine_message_send(mpf_engine_t * engine,mpf_task_msg_t ** task_msg)216 MPF_DECLARE(apt_bool_t) mpf_engine_message_send(mpf_engine_t *engine, mpf_task_msg_t **task_msg)
217 {
218 apt_bool_t status = FALSE;
219 if(*task_msg) {
220 status = apt_task_msg_signal(engine->task,*task_msg);
221 *task_msg = NULL;
222 }
223 return status;
224 }
225
mpf_engine_destroy(apt_task_t * task)226 static apt_bool_t mpf_engine_destroy(apt_task_t *task)
227 {
228 mpf_engine_t *engine = apt_task_object_get(task);
229
230 apt_timer_queue_destroy(engine->timer_queue);
231 mpf_scheduler_destroy(engine->scheduler);
232 mpf_context_factory_destroy(engine->context_factory);
233 apt_cyclic_queue_destroy(engine->request_queue);
234 apr_thread_mutex_destroy(engine->request_queue_guard);
235 return TRUE;
236 }
237
mpf_engine_start(apt_task_t * task)238 static apt_bool_t mpf_engine_start(apt_task_t *task)
239 {
240 mpf_engine_t *engine = apt_task_object_get(task);
241
242 mpf_scheduler_start(engine->scheduler);
243 apt_task_start_request_process(task);
244 return TRUE;
245 }
246
mpf_engine_terminate(apt_task_t * task)247 static apt_bool_t mpf_engine_terminate(apt_task_t *task)
248 {
249 mpf_engine_t *engine = apt_task_object_get(task);
250
251 mpf_scheduler_stop(engine->scheduler);
252 apt_task_terminate_request_process(task);
253 return TRUE;
254 }
255
mpf_engine_event_raise(mpf_termination_t * termination,int event_id,void * descriptor)256 static apt_bool_t mpf_engine_event_raise(mpf_termination_t *termination, int event_id, void *descriptor)
257 {
258 apt_task_msg_t *task_msg;
259 mpf_message_container_t *event_msg;
260 mpf_message_t *mpf_message;
261 mpf_engine_t *engine;
262 engine = termination->media_engine;
263 if(!engine) {
264 return FALSE;
265 }
266
267 task_msg = apt_task_msg_get(engine->task);
268 if(!task_msg) {
269 return FALSE;
270 }
271 task_msg->type = engine->task_msg_type;
272 event_msg = (mpf_message_container_t*) task_msg->data;
273 mpf_message = event_msg->messages;
274 event_msg->count = 1;
275
276 mpf_message->command_id = event_id;
277 mpf_message->message_type = MPF_MESSAGE_TYPE_EVENT;
278 mpf_message->status_code = MPF_STATUS_CODE_SUCCESS;
279 mpf_message->context = NULL;
280 mpf_message->termination = termination;
281 mpf_message->descriptor = descriptor;
282
283 return apt_task_msg_parent_signal(engine->task,task_msg);
284 }
285
mpf_engine_msg_signal(apt_task_t * task,apt_task_msg_t * msg)286 static apt_bool_t mpf_engine_msg_signal(apt_task_t *task, apt_task_msg_t *msg)
287 {
288 mpf_engine_t *engine = apt_task_object_get(task);
289
290 apr_thread_mutex_lock(engine->request_queue_guard);
291 if(apt_cyclic_queue_push(engine->request_queue,msg) == FALSE) {
292 apt_log(APT_LOG_MARK,APT_PRIO_ERROR,"MPF Request Queue is Full [%s]",apt_task_name_get(task));
293 }
294 apr_thread_mutex_unlock(engine->request_queue_guard);
295 return TRUE;
296 }
297
mpf_engine_msg_process(apt_task_t * task,apt_task_msg_t * msg)298 static apt_bool_t mpf_engine_msg_process(apt_task_t *task, apt_task_msg_t *msg)
299 {
300 apr_size_t i;
301 mpf_engine_t *engine = apt_task_object_get(task);
302 apt_task_msg_t *response_msg;
303 mpf_message_container_t *response;
304 mpf_message_t *mpf_response;
305 mpf_context_t *context;
306 mpf_termination_t *termination;
307 const mpf_message_t *mpf_request;
308 const mpf_message_container_t *request = (const mpf_message_container_t*) msg->data;
309
310 response_msg = apt_task_msg_get(engine->task);
311 if(!response_msg) {
312 return FALSE;
313 }
314 response_msg->type = engine->task_msg_type;
315 response = (mpf_message_container_t*) response_msg->data;
316 *response = *request;
317 for(i=0; i<request->count; i++) {
318 mpf_request = &request->messages[i];
319 mpf_response = &response->messages[i];
320
321 if(mpf_request->message_type != MPF_MESSAGE_TYPE_REQUEST) {
322 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Invalid MPF Message Type [%d]",mpf_request->message_type);
323 continue;
324 }
325
326 mpf_response->message_type = MPF_MESSAGE_TYPE_RESPONSE;
327 mpf_response->status_code = MPF_STATUS_CODE_SUCCESS;
328 context = mpf_request->context;
329 termination = mpf_request->termination;
330 switch(mpf_request->command_id) {
331 case MPF_ADD_TERMINATION:
332 {
333 termination->media_engine = engine;
334 termination->event_handler = mpf_engine_event_raise;
335 termination->codec_manager = engine->codec_manager;
336 termination->timer_queue = engine->timer_queue;
337
338 mpf_termination_add(termination,mpf_request->descriptor);
339 if(mpf_context_termination_add(context,termination) == FALSE) {
340 mpf_termination_subtract(termination);
341 mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
342 break;
343 }
344 break;
345 }
346 case MPF_MODIFY_TERMINATION:
347 {
348 mpf_termination_modify(termination,mpf_request->descriptor);
349 break;
350 }
351 case MPF_SUBTRACT_TERMINATION:
352 {
353 if(mpf_context_termination_subtract(context,termination) == FALSE) {
354 mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
355 break;
356 }
357 mpf_termination_subtract(termination);
358 break;
359 }
360 case MPF_ADD_ASSOCIATION:
361 {
362 mpf_context_association_add(context,termination,mpf_request->assoc_termination);
363 break;
364 }
365 case MPF_REMOVE_ASSOCIATION:
366 {
367 mpf_context_association_remove(context,termination,mpf_request->assoc_termination);
368 break;
369 }
370 case MPF_RESET_ASSOCIATIONS:
371 {
372 mpf_context_associations_reset(context);
373 break;
374 }
375 case MPF_APPLY_TOPOLOGY:
376 {
377 mpf_context_topology_apply(context);
378 break;
379 }
380 case MPF_DESTROY_TOPOLOGY:
381 {
382 mpf_context_topology_destroy(context);
383 break;
384 }
385 default:
386 {
387 mpf_response->status_code = MPF_STATUS_CODE_FAILURE;
388 }
389 }
390 }
391
392 return apt_task_msg_parent_signal(engine->task,response_msg);
393 }
394
mpf_engine_main(mpf_scheduler_t * scheduler,void * obj)395 static void mpf_engine_main(mpf_scheduler_t *scheduler, void *obj)
396 {
397 mpf_engine_t *engine = obj;
398 apt_task_msg_t *msg;
399
400 /* process request queue */
401 apr_thread_mutex_lock(engine->request_queue_guard);
402 msg = apt_cyclic_queue_pop(engine->request_queue);
403 while(msg) {
404 apr_thread_mutex_unlock(engine->request_queue_guard);
405 apt_task_msg_process(engine->task,msg);
406 apr_thread_mutex_lock(engine->request_queue_guard);
407 msg = apt_cyclic_queue_pop(engine->request_queue);
408 }
409 apr_thread_mutex_unlock(engine->request_queue_guard);
410
411 /* process factory of media contexts */
412 mpf_context_factory_process(engine->context_factory);
413 }
414
mpf_engine_timer_proc(mpf_scheduler_t * scheduler,void * obj)415 static void mpf_engine_timer_proc(mpf_scheduler_t *scheduler, void *obj)
416 {
417 mpf_engine_t *engine = obj;
418 apt_timer_queue_advance(engine->timer_queue,MPF_TIMER_RESOLUTION);
419 }
420
mpf_engine_codec_manager_create(apr_pool_t * pool)421 MPF_DECLARE(mpf_codec_manager_t*) mpf_engine_codec_manager_create(apr_pool_t *pool)
422 {
423 mpf_codec_manager_t *codec_manager = mpf_codec_manager_create(4,pool);
424 if(codec_manager) {
425 mpf_codec_t *codec;
426
427 codec = mpf_codec_g711u_create(pool);
428 mpf_codec_manager_codec_register(codec_manager,codec);
429
430 codec = mpf_codec_g711a_create(pool);
431 mpf_codec_manager_codec_register(codec_manager,codec);
432
433 codec = mpf_codec_l16_create(pool);
434 mpf_codec_manager_codec_register(codec_manager,codec);
435 }
436 return codec_manager;
437 }
438
mpf_engine_codec_manager_register(mpf_engine_t * engine,const mpf_codec_manager_t * codec_manager)439 MPF_DECLARE(apt_bool_t) mpf_engine_codec_manager_register(mpf_engine_t *engine, const mpf_codec_manager_t *codec_manager)
440 {
441 engine->codec_manager = codec_manager;
442 return TRUE;
443 }
444
mpf_engine_scheduler_rate_set(mpf_engine_t * engine,unsigned long rate)445 MPF_DECLARE(apt_bool_t) mpf_engine_scheduler_rate_set(mpf_engine_t *engine, unsigned long rate)
446 {
447 return mpf_scheduler_rate_set(engine->scheduler,rate);
448 }
449
mpf_engine_id_get(const mpf_engine_t * engine)450 MPF_DECLARE(const char*) mpf_engine_id_get(const mpf_engine_t *engine)
451 {
452 return apt_task_name_get(engine->task);
453 }
454