1 /*
2 * Copyright (c) 2014-2017, Siemens AG. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice,
8 * this list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
18 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #include <assert.h>
28
29 #include <embb/mtapi/c/mtapi.h>
30 #include <embb/base/c/atomic.h>
31
32 #include <embb/base/c/internal/unused.h>
33
34 #include <embb_mtapi_log.h>
35 #include <mtapi_status_t.h>
36 #include <embb_mtapi_queue_t.h>
37 #include <embb_mtapi_group_t.h>
38 #include <embb_mtapi_node_t.h>
39 #include <embb_mtapi_task_t.h>
40 #include <embb_mtapi_job_t.h>
41 #include <embb_mtapi_action_t.h>
42 #include <embb_mtapi_pool_template-inl.h>
43 #include <embb_mtapi_task_queue_t.h>
44 #include <embb_mtapi_scheduler_t.h>
45 #include <embb_mtapi_thread_context_t.h>
46 #include <embb_mtapi_attr.h>
47
48
49 /* ---- POOL STORAGE FUNCTIONS --------------------------------------------- */
50
embb_mtapi_pool_implementation(queue)51 embb_mtapi_pool_implementation(queue)
52
53
54 /* ---- CLASS MEMBERS ------------------------------------------------------ */
55
56 void embb_mtapi_queue_initialize(embb_mtapi_queue_t* that) {
57 assert(MTAPI_NULL != that);
58
59 mtapi_queueattr_init(&that->attributes, MTAPI_NULL);
60 that->queue_id = MTAPI_QUEUE_ID_NONE;
61 embb_atomic_init_int(&that->ordered_task_executing, 0);
62 embb_atomic_init_char(&that->enabled, MTAPI_FALSE);
63 embb_atomic_init_int(&that->num_tasks, 0);
64 that->job_handle.id = 0;
65 that->job_handle.tag = 0;
66 embb_mtapi_task_queue_initialize(&that->retained_tasks);
67 embb_mtapi_task_queue_initialize(&that->ordered_tasks);
68 }
69
embb_mtapi_queue_initialize_with_attributes_and_job(embb_mtapi_queue_t * that,mtapi_queue_attributes_t * attributes,mtapi_job_hndl_t job)70 void embb_mtapi_queue_initialize_with_attributes_and_job(
71 embb_mtapi_queue_t* that,
72 mtapi_queue_attributes_t* attributes,
73 mtapi_job_hndl_t job) {
74 assert(MTAPI_NULL != that);
75 assert(MTAPI_NULL != attributes);
76
77 that->attributes = *attributes;
78 that->queue_id = MTAPI_QUEUE_ID_NONE;
79 embb_atomic_init_int(&that->ordered_task_executing, 0);
80 embb_atomic_init_char(&that->enabled, MTAPI_TRUE);
81 embb_atomic_init_int(&that->num_tasks, 0);
82 that->job_handle = job;
83 embb_mtapi_task_queue_initialize(&that->retained_tasks);
84 embb_mtapi_task_queue_initialize(&that->ordered_tasks);
85 }
86
embb_mtapi_queue_finalize(embb_mtapi_queue_t * that)87 void embb_mtapi_queue_finalize(embb_mtapi_queue_t* that) {
88 assert(MTAPI_NULL != that);
89
90 embb_mtapi_task_queue_finalize(&that->ordered_tasks);
91 embb_mtapi_task_queue_finalize(&that->retained_tasks);
92 that->job_handle.id = 0;
93 that->job_handle.tag = 0;
94 embb_atomic_destroy_int(&that->num_tasks);
95 embb_atomic_destroy_char(&that->enabled);
96 embb_atomic_destroy_int(&that->ordered_task_executing);
97 that->queue_id = MTAPI_QUEUE_ID_NONE;
98 }
99
embb_mtapi_queue_task_started(embb_mtapi_queue_t * that)100 void embb_mtapi_queue_task_started(embb_mtapi_queue_t* that) {
101 assert(MTAPI_NULL != that);
102 embb_atomic_fetch_and_add_int(&that->num_tasks, 1);
103 }
104
embb_mtapi_queue_task_finished(embb_mtapi_queue_t * that)105 void embb_mtapi_queue_task_finished(embb_mtapi_queue_t* that) {
106 assert(MTAPI_NULL != that);
107 embb_atomic_fetch_and_add_int(&that->num_tasks, -1);
108 }
109
embb_mtapi_queue_ordered_task_start(embb_mtapi_queue_t * that)110 int embb_mtapi_queue_ordered_task_start(
111 embb_mtapi_queue_t* that) {
112 int expected = 0;
113 return embb_atomic_compare_and_swap_int(
114 &that->ordered_task_executing, &expected, 1);
115 }
116
embb_mtapi_queue_ordered_task_finish(embb_mtapi_queue_t * that)117 void embb_mtapi_queue_ordered_task_finish(
118 embb_mtapi_queue_t* that) {
119 embb_atomic_store_int(&that->ordered_task_executing, 0);
120 }
121
embb_mtapi_queue_delete_visitor(embb_mtapi_task_t * task,void * user_data)122 static mtapi_boolean_t embb_mtapi_queue_delete_visitor(
123 embb_mtapi_task_t * task,
124 void * user_data) {
125 embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data;
126
127 assert(MTAPI_NULL != queue);
128 assert(MTAPI_NULL != task);
129
130 if (task->queue.id == queue->handle.id &&
131 task->queue.tag == queue->handle.tag) {
132 /* task is scheduled and needs to be cancelled */
133 embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED);
134 task->error_code = MTAPI_ERR_QUEUE_DELETED;
135 }
136
137 /* do not remove task from queue */
138 return MTAPI_TRUE;
139 }
140
embb_mtapi_queue_disable_visitor(embb_mtapi_task_t * task,void * user_data)141 static mtapi_boolean_t embb_mtapi_queue_disable_visitor(
142 embb_mtapi_task_t * task,
143 void * user_data) {
144 embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data;
145 /* do not remove task from queue by default */
146 mtapi_boolean_t result = MTAPI_TRUE;
147
148 assert(MTAPI_NULL != queue);
149 assert(MTAPI_NULL != task);
150
151 if (task->queue.id == queue->handle.id &&
152 task->queue.tag == queue->handle.tag) {
153 if (queue->attributes.retain) {
154 /* task is scheduled and needs to be retained */
155 embb_mtapi_task_set_state(task, MTAPI_TASK_RETAINED);
156 embb_mtapi_task_queue_push_back(&queue->retained_tasks, task);
157 /* remove task from queue */
158 result = MTAPI_FALSE;
159 } else {
160 /* task is scheduled and needs to be cancelled */
161 embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED);
162 task->error_code = MTAPI_ERR_QUEUE_DISABLED;
163 }
164 }
165
166 return result;
167 }
168
169
170 /* ---- INTERFACE FUNCTIONS ------------------------------------------------ */
171
mtapi_queue_create(MTAPI_IN mtapi_queue_id_t queue_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN mtapi_queue_attributes_t * attributes,MTAPI_OUT mtapi_status_t * status)172 mtapi_queue_hndl_t mtapi_queue_create(
173 MTAPI_IN mtapi_queue_id_t queue_id,
174 MTAPI_IN mtapi_job_hndl_t job,
175 MTAPI_IN mtapi_queue_attributes_t* attributes,
176 MTAPI_OUT mtapi_status_t* status) {
177 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
178 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
179 embb_mtapi_queue_t* queue = MTAPI_NULL;
180 mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
181 mtapi_queue_attributes_t attr;
182
183 embb_mtapi_log_trace("mtapi_queue_create() called\n");
184
185 if (embb_mtapi_node_is_initialized()) {
186 queue = embb_mtapi_queue_pool_allocate(node->queue_pool);
187 if (MTAPI_NULL != queue) {
188 if (MTAPI_NULL != attributes) {
189 attr = *attributes;
190 local_status = MTAPI_SUCCESS;
191 } else {
192 mtapi_queueattr_init(&attr, &local_status);
193 }
194 if (MTAPI_SUCCESS == local_status) {
195 if (embb_mtapi_job_is_handle_valid(node, job)) {
196 embb_mtapi_queue_initialize_with_attributes_and_job(
197 queue, &attr, job);
198 /* for an ordered queue, initialize affinity */
199 if (queue->attributes.ordered) {
200 mtapi_affinity_init(
201 &queue->ordered_affinity,
202 MTAPI_FALSE, MTAPI_NULL);
203 mtapi_affinity_set(
204 &queue->ordered_affinity,
205 queue->handle.id % node->scheduler->worker_count,
206 MTAPI_TRUE, MTAPI_NULL);
207 }
208 queue->queue_id = queue_id;
209 queue_hndl = queue->handle;
210 } else {
211 embb_mtapi_queue_pool_deallocate(node->queue_pool, queue);
212 local_status = MTAPI_ERR_JOB_INVALID;
213 }
214 } else {
215 embb_mtapi_queue_pool_deallocate(node->queue_pool, queue);
216 }
217 } else {
218 local_status = MTAPI_ERR_QUEUE_LIMIT;
219 }
220 } else {
221 local_status = MTAPI_ERR_NODE_NOTINIT;
222 }
223
224 mtapi_status_set(status, local_status);
225 return queue_hndl;
226 }
227
mtapi_queue_set_attribute(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_IN void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)228 void mtapi_queue_set_attribute(
229 MTAPI_IN mtapi_queue_hndl_t queue,
230 MTAPI_IN mtapi_uint_t attribute_num,
231 MTAPI_IN void* attribute,
232 MTAPI_IN mtapi_size_t attribute_size,
233 MTAPI_OUT mtapi_status_t* status) {
234 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
235 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
236 embb_mtapi_queue_t* local_queue;
237
238 embb_mtapi_log_trace("mtapi_queue_set_attribute() called\n");
239
240 if (embb_mtapi_node_is_initialized()) {
241 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
242 local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
243 node->queue_pool, queue);
244 mtapi_queueattr_set(&local_queue->attributes, attribute_num,
245 attribute, attribute_size, &local_status);
246 } else {
247 local_status = MTAPI_ERR_QUEUE_INVALID;
248 }
249 } else {
250 local_status = MTAPI_ERR_NODE_NOTINIT;
251 }
252
253 mtapi_status_set(status, local_status);
254 }
255
mtapi_queue_get_attribute(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_OUT void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)256 void mtapi_queue_get_attribute(
257 MTAPI_IN mtapi_queue_hndl_t queue,
258 MTAPI_IN mtapi_uint_t attribute_num,
259 MTAPI_OUT void* attribute,
260 MTAPI_IN mtapi_size_t attribute_size,
261 MTAPI_OUT mtapi_status_t* status
262 ) {
263 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
264 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
265 embb_mtapi_queue_t* local_queue;
266
267 embb_mtapi_log_trace("mtapi_queue_get_attribute() called\n");
268
269 if (embb_mtapi_node_is_initialized()) {
270 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
271 local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
272 node->queue_pool, queue);
273
274 if (MTAPI_NULL == attribute) {
275 local_status = MTAPI_ERR_PARAMETER;
276 } else {
277 switch (attribute_num) {
278 case MTAPI_QUEUE_GLOBAL:
279 local_status = embb_mtapi_attr_get_mtapi_boolean_t(
280 &local_queue->attributes.global, attribute, attribute_size);
281 break;
282
283 case MTAPI_QUEUE_PRIORITY:
284 local_status = embb_mtapi_attr_get_mtapi_uint_t(
285 &local_queue->attributes.priority, attribute, attribute_size);
286 break;
287
288 case MTAPI_QUEUE_LIMIT:
289 local_status = embb_mtapi_attr_get_mtapi_uint_t(
290 &local_queue->attributes.limit, attribute, attribute_size);
291 break;
292
293 case MTAPI_QUEUE_ORDERED:
294 local_status = embb_mtapi_attr_get_mtapi_boolean_t(
295 &local_queue->attributes.ordered, attribute, attribute_size);
296 break;
297
298 case MTAPI_QUEUE_RETAIN:
299 local_status = embb_mtapi_attr_get_mtapi_boolean_t(
300 &local_queue->attributes.retain, attribute, attribute_size);
301 break;
302
303 case MTAPI_QUEUE_DOMAIN_SHARED:
304 local_status = embb_mtapi_attr_get_mtapi_boolean_t(
305 &local_queue->attributes.domain_shared, attribute, attribute_size);
306 break;
307
308 default:
309 local_status = MTAPI_ERR_ATTR_NUM;
310 break;
311 }
312 }
313 } else {
314 local_status = MTAPI_ERR_QUEUE_INVALID;
315 }
316 } else {
317 local_status = MTAPI_ERR_NODE_NOTINIT;
318 }
319
320 mtapi_status_set(status, local_status);
321 }
322
mtapi_queue_get(MTAPI_IN mtapi_queue_id_t queue_id,MTAPI_IN mtapi_domain_t domain_id,MTAPI_OUT mtapi_status_t * status)323 mtapi_queue_hndl_t mtapi_queue_get(
324 MTAPI_IN mtapi_queue_id_t queue_id,
325 MTAPI_IN mtapi_domain_t domain_id,
326 MTAPI_OUT mtapi_status_t* status) {
327 mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
328 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
329
330 EMBB_UNUSED(domain_id);
331
332 embb_mtapi_log_trace("mtapi_queue_get() called\n");
333
334 if (embb_mtapi_node_is_initialized()) {
335 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
336 mtapi_uint_t ii;
337
338 local_status = MTAPI_ERR_QUEUE_INVALID;
339 for (ii = 0; ii < node->attributes.max_queues; ii++) {
340 if (queue_id == node->queue_pool->storage[ii].queue_id) {
341 queue_hndl = node->queue_pool->storage[ii].handle;
342 local_status = MTAPI_SUCCESS;
343 break;
344 }
345 }
346 } else {
347 local_status = MTAPI_ERR_NODE_NOTINIT;
348 }
349
350 mtapi_status_set(status, local_status);
351 return queue_hndl;
352 }
353
mtapi_queue_delete(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)354 void mtapi_queue_delete(
355 MTAPI_IN mtapi_queue_hndl_t queue,
356 MTAPI_IN mtapi_timeout_t timeout,
357 MTAPI_OUT mtapi_status_t* status) {
358 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
359
360 embb_mtapi_log_trace("mtapi_queue_delete() called\n");
361
362 if (embb_mtapi_node_is_initialized()) {
363 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
364 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
365 embb_mtapi_queue_t* local_queue =
366 embb_mtapi_queue_pool_get_storage_for_handle(
367 node->queue_pool, queue);
368 embb_mtapi_thread_context_t * context = NULL;
369
370 embb_duration_t wait_duration;
371 embb_time_t start_time;
372 embb_time_t end_time;
373 if (MTAPI_INFINITE < timeout) {
374 embb_duration_set_milliseconds(
375 &wait_duration, (unsigned long long)timeout);
376 embb_time_now(&start_time);
377 embb_time_in(&end_time, &wait_duration);
378 }
379
380 /* find out on which thread we are */
381 context = embb_mtapi_scheduler_get_current_thread_context(
382 node->scheduler);
383
384 if (local_queue->attributes.retain) {
385 /* reschedule retained tasks */
386 embb_mtapi_task_t * task =
387 embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
388 while (MTAPI_NULL != task) {
389 embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
390 embb_mtapi_scheduler_schedule_task(node->scheduler, task);
391 task = embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
392 }
393 }
394
395 /* cancel all tasks */
396 embb_mtapi_scheduler_process_tasks(
397 node->scheduler, embb_mtapi_queue_delete_visitor, local_queue);
398
399 /* wait for tasks in queue to finish */
400 local_status = MTAPI_SUCCESS;
401 while (0 != embb_atomic_load_int(&local_queue->num_tasks)) {
402 if (MTAPI_INFINITE < timeout) {
403 embb_time_t current_time;
404 embb_time_now(¤t_time);
405 if (embb_time_compare(¤t_time, &start_time) < 0) {
406 /* time has moved backwards, maybe a wraparound or jitter
407 move end_time backward to avoid endeless loop */
408 start_time = current_time;
409 embb_time_in(&end_time, &wait_duration);
410 }
411 if (embb_time_compare(¤t_time, &end_time) > 0) {
412 /* timeout! */
413 local_status = MTAPI_TIMEOUT;
414 break;
415 }
416 }
417
418 /* do other work if applicable */
419 embb_mtapi_scheduler_execute_task_or_yield(
420 node->scheduler,
421 node,
422 context);
423 }
424
425 if (MTAPI_SUCCESS == local_status) {
426 /* delete queue */
427 embb_mtapi_queue_finalize(local_queue);
428 embb_mtapi_queue_pool_deallocate(node->queue_pool, local_queue);
429 }
430 } else {
431 local_status = MTAPI_ERR_QUEUE_INVALID;
432 }
433 } else {
434 local_status = MTAPI_ERR_NODE_NOTINIT;
435 }
436
437 mtapi_status_set(status, local_status);
438 }
439
mtapi_queue_disable(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)440 void mtapi_queue_disable(
441 MTAPI_IN mtapi_queue_hndl_t queue,
442 MTAPI_IN mtapi_timeout_t timeout,
443 MTAPI_OUT mtapi_status_t* status) {
444 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
445
446 embb_mtapi_log_trace("mtapi_queue_disable() called\n");
447
448 if (embb_mtapi_node_is_initialized()) {
449 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
450 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
451 embb_mtapi_queue_t* local_queue =
452 embb_mtapi_queue_pool_get_storage_for_handle(
453 node->queue_pool, queue);
454 embb_atomic_store_char(&local_queue->enabled, MTAPI_FALSE);
455
456 /* cancel or retain all tasks scheduled via queue */
457 embb_mtapi_scheduler_process_tasks(
458 node->scheduler, embb_mtapi_queue_disable_visitor, local_queue);
459
460 /* cancel all tasks held back the ordered queue if it is not retaining */
461 if (local_queue->attributes.ordered && !local_queue->attributes.retain) {
462 embb_mtapi_action_t * local_action = MTAPI_NULL;
463 embb_mtapi_task_t * task =
464 embb_mtapi_task_queue_pop_front(&local_queue->ordered_tasks);
465 while (MTAPI_NULL != task) {
466 /* get action for task */
467 if (embb_mtapi_action_pool_is_handle_valid(
468 node->action_pool, task->action)) {
469 local_action =
470 embb_mtapi_action_pool_get_storage_for_handle(
471 node->action_pool, task->action);
472 }
473 /* set state to cancelled */
474 task->error_code = MTAPI_ERR_ACTION_CANCELLED;
475 embb_mtapi_scheduler_finalize_task(task, node, MTAPI_TASK_CANCELLED);
476 /* remove task from action */
477 if (MTAPI_NULL != local_action) {
478 embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
479 }
480 /* get next task */
481 task = embb_mtapi_task_queue_pop_front(&local_queue->ordered_tasks);
482 }
483 }
484
485 /* if queue is not retaining, wait for all tasks to finish */
486 if (MTAPI_FALSE == local_queue->attributes.retain) {
487 /* find out on which thread we are */
488 embb_mtapi_thread_context_t * context =
489 embb_mtapi_scheduler_get_current_thread_context(node->scheduler);
490
491 embb_duration_t wait_duration;
492 embb_time_t start_time;
493 embb_time_t end_time;
494 if (MTAPI_INFINITE < timeout) {
495 embb_duration_set_milliseconds(
496 &wait_duration, (unsigned long long)timeout);
497 embb_time_now(&start_time);
498 embb_time_in(&end_time, &wait_duration);
499 }
500
501 /* wait for tasks in queue to finish */
502 local_status = MTAPI_SUCCESS;
503 while (0 != embb_atomic_load_int(&local_queue->num_tasks)) {
504 if (MTAPI_INFINITE < timeout) {
505 embb_time_t current_time;
506 embb_time_now(¤t_time);
507 if (embb_time_compare(¤t_time, &start_time) < 0) {
508 /* time has moved backwards, maybe a wraparound or jitter
509 move end_time backward to avoid endeless loop */
510 start_time = current_time;
511 embb_time_in(&end_time, &wait_duration);
512 }
513 if (embb_time_compare(¤t_time, &end_time) > 0) {
514 /* timeout! */
515 local_status = MTAPI_TIMEOUT;
516 break;
517 }
518 }
519
520 /* do other work if applicable */
521 embb_mtapi_scheduler_execute_task_or_yield(
522 node->scheduler,
523 node,
524 context);
525 }
526 } else {
527 local_status = MTAPI_SUCCESS;
528 }
529 } else {
530 local_status = MTAPI_ERR_QUEUE_INVALID;
531 }
532 } else {
533 local_status = MTAPI_ERR_NODE_NOTINIT;
534 }
535
536 mtapi_status_set(status, local_status);
537 }
538
mtapi_queue_enable(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_OUT mtapi_status_t * status)539 void mtapi_queue_enable(
540 MTAPI_IN mtapi_queue_hndl_t queue,
541 MTAPI_OUT mtapi_status_t* status) {
542 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
543
544 embb_mtapi_log_trace("mtapi_queue_enable() called\n");
545
546 if (embb_mtapi_node_is_initialized()) {
547 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
548 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
549 embb_mtapi_queue_t* local_queue =
550 embb_mtapi_queue_pool_get_storage_for_handle(
551 node->queue_pool, queue);
552 embb_atomic_store_char(&local_queue->enabled, MTAPI_TRUE);
553 local_status = MTAPI_SUCCESS;
554 if (local_queue->attributes.retain) {
555 /* reschedule retained tasks */
556 embb_mtapi_task_t * task =
557 embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
558 while (MTAPI_NULL != task) {
559 embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
560 embb_mtapi_scheduler_schedule_task(node->scheduler, task);
561 task = embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
562 }
563 }
564 } else {
565 local_status = MTAPI_ERR_QUEUE_INVALID;
566 }
567 } else {
568 local_status = MTAPI_ERR_NODE_NOTINIT;
569 }
570
571 mtapi_status_set(status, local_status);
572 }
573