1 /*
2  * Copyright (c) 2014-2017, Siemens AG. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  * this list of conditions and the following disclaimer in the documentation
12  * and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
18  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include <assert.h>
28 
29 #include <embb/mtapi/c/mtapi.h>
30 #include <embb/base/c/atomic.h>
31 
32 #include <embb/base/c/internal/unused.h>
33 
34 #include <embb_mtapi_log.h>
35 #include <mtapi_status_t.h>
36 #include <embb_mtapi_queue_t.h>
37 #include <embb_mtapi_group_t.h>
38 #include <embb_mtapi_node_t.h>
39 #include <embb_mtapi_task_t.h>
40 #include <embb_mtapi_job_t.h>
41 #include <embb_mtapi_action_t.h>
42 #include <embb_mtapi_pool_template-inl.h>
43 #include <embb_mtapi_task_queue_t.h>
44 #include <embb_mtapi_scheduler_t.h>
45 #include <embb_mtapi_thread_context_t.h>
46 #include <embb_mtapi_attr.h>
47 
48 
49 /* ---- POOL STORAGE FUNCTIONS --------------------------------------------- */
50 
embb_mtapi_pool_implementation(queue)51 embb_mtapi_pool_implementation(queue)
52 
53 
54 /* ---- CLASS MEMBERS ------------------------------------------------------ */
55 
56 void embb_mtapi_queue_initialize(embb_mtapi_queue_t* that) {
57   assert(MTAPI_NULL != that);
58 
59   mtapi_queueattr_init(&that->attributes, MTAPI_NULL);
60   that->queue_id = MTAPI_QUEUE_ID_NONE;
61   embb_atomic_init_int(&that->ordered_task_executing, 0);
62   embb_atomic_init_char(&that->enabled, MTAPI_FALSE);
63   embb_atomic_init_int(&that->num_tasks, 0);
64   that->job_handle.id = 0;
65   that->job_handle.tag = 0;
66   embb_mtapi_task_queue_initialize(&that->retained_tasks);
67   embb_mtapi_task_queue_initialize(&that->ordered_tasks);
68 }
69 
embb_mtapi_queue_initialize_with_attributes_and_job(embb_mtapi_queue_t * that,mtapi_queue_attributes_t * attributes,mtapi_job_hndl_t job)70 void embb_mtapi_queue_initialize_with_attributes_and_job(
71   embb_mtapi_queue_t* that,
72   mtapi_queue_attributes_t* attributes,
73   mtapi_job_hndl_t job) {
74   assert(MTAPI_NULL != that);
75   assert(MTAPI_NULL != attributes);
76 
77   that->attributes = *attributes;
78   that->queue_id = MTAPI_QUEUE_ID_NONE;
79   embb_atomic_init_int(&that->ordered_task_executing, 0);
80   embb_atomic_init_char(&that->enabled, MTAPI_TRUE);
81   embb_atomic_init_int(&that->num_tasks, 0);
82   that->job_handle = job;
83   embb_mtapi_task_queue_initialize(&that->retained_tasks);
84   embb_mtapi_task_queue_initialize(&that->ordered_tasks);
85 }
86 
embb_mtapi_queue_finalize(embb_mtapi_queue_t * that)87 void embb_mtapi_queue_finalize(embb_mtapi_queue_t* that) {
88   assert(MTAPI_NULL != that);
89 
90   embb_mtapi_task_queue_finalize(&that->ordered_tasks);
91   embb_mtapi_task_queue_finalize(&that->retained_tasks);
92   that->job_handle.id = 0;
93   that->job_handle.tag = 0;
94   embb_atomic_destroy_int(&that->num_tasks);
95   embb_atomic_destroy_char(&that->enabled);
96   embb_atomic_destroy_int(&that->ordered_task_executing);
97   that->queue_id = MTAPI_QUEUE_ID_NONE;
98 }
99 
embb_mtapi_queue_task_started(embb_mtapi_queue_t * that)100 void embb_mtapi_queue_task_started(embb_mtapi_queue_t* that) {
101   assert(MTAPI_NULL != that);
102   embb_atomic_fetch_and_add_int(&that->num_tasks, 1);
103 }
104 
embb_mtapi_queue_task_finished(embb_mtapi_queue_t * that)105 void embb_mtapi_queue_task_finished(embb_mtapi_queue_t* that) {
106   assert(MTAPI_NULL != that);
107   embb_atomic_fetch_and_add_int(&that->num_tasks, -1);
108 }
109 
embb_mtapi_queue_ordered_task_start(embb_mtapi_queue_t * that)110 int embb_mtapi_queue_ordered_task_start(
111   embb_mtapi_queue_t* that) {
112   int expected = 0;
113   return embb_atomic_compare_and_swap_int(
114     &that->ordered_task_executing, &expected, 1);
115 }
116 
embb_mtapi_queue_ordered_task_finish(embb_mtapi_queue_t * that)117 void embb_mtapi_queue_ordered_task_finish(
118   embb_mtapi_queue_t* that) {
119   embb_atomic_store_int(&that->ordered_task_executing, 0);
120 }
121 
embb_mtapi_queue_delete_visitor(embb_mtapi_task_t * task,void * user_data)122 static mtapi_boolean_t embb_mtapi_queue_delete_visitor(
123   embb_mtapi_task_t * task,
124   void * user_data) {
125   embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data;
126 
127   assert(MTAPI_NULL != queue);
128   assert(MTAPI_NULL != task);
129 
130   if (task->queue.id == queue->handle.id &&
131       task->queue.tag == queue->handle.tag) {
132     /* task is scheduled and needs to be cancelled */
133     embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED);
134     task->error_code = MTAPI_ERR_QUEUE_DELETED;
135   }
136 
137   /* do not remove task from queue */
138   return MTAPI_TRUE;
139 }
140 
embb_mtapi_queue_disable_visitor(embb_mtapi_task_t * task,void * user_data)141 static mtapi_boolean_t embb_mtapi_queue_disable_visitor(
142   embb_mtapi_task_t * task,
143   void * user_data) {
144   embb_mtapi_queue_t * queue = (embb_mtapi_queue_t*)user_data;
145   /* do not remove task from queue by default */
146   mtapi_boolean_t result = MTAPI_TRUE;
147 
148   assert(MTAPI_NULL != queue);
149   assert(MTAPI_NULL != task);
150 
151   if (task->queue.id == queue->handle.id &&
152       task->queue.tag == queue->handle.tag) {
153     if (queue->attributes.retain) {
154       /* task is scheduled and needs to be retained */
155       embb_mtapi_task_set_state(task, MTAPI_TASK_RETAINED);
156       embb_mtapi_task_queue_push_back(&queue->retained_tasks, task);
157       /* remove task from queue */
158       result = MTAPI_FALSE;
159     } else {
160       /* task is scheduled and needs to be cancelled */
161       embb_mtapi_task_set_state(task, MTAPI_TASK_CANCELLED);
162       task->error_code = MTAPI_ERR_QUEUE_DISABLED;
163     }
164   }
165 
166   return result;
167 }
168 
169 
170 /* ---- INTERFACE FUNCTIONS ------------------------------------------------ */
171 
mtapi_queue_create(MTAPI_IN mtapi_queue_id_t queue_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN mtapi_queue_attributes_t * attributes,MTAPI_OUT mtapi_status_t * status)172 mtapi_queue_hndl_t mtapi_queue_create(
173   MTAPI_IN mtapi_queue_id_t queue_id,
174   MTAPI_IN mtapi_job_hndl_t job,
175   MTAPI_IN mtapi_queue_attributes_t* attributes,
176   MTAPI_OUT mtapi_status_t* status) {
177   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
178   embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
179   embb_mtapi_queue_t* queue = MTAPI_NULL;
180   mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
181   mtapi_queue_attributes_t attr;
182 
183   embb_mtapi_log_trace("mtapi_queue_create() called\n");
184 
185   if (embb_mtapi_node_is_initialized()) {
186     queue = embb_mtapi_queue_pool_allocate(node->queue_pool);
187     if (MTAPI_NULL != queue) {
188       if (MTAPI_NULL != attributes) {
189         attr = *attributes;
190         local_status = MTAPI_SUCCESS;
191       } else {
192         mtapi_queueattr_init(&attr, &local_status);
193       }
194       if (MTAPI_SUCCESS == local_status) {
195         if (embb_mtapi_job_is_handle_valid(node, job)) {
196           embb_mtapi_queue_initialize_with_attributes_and_job(
197             queue, &attr, job);
198           /* for an ordered queue, initialize affinity */
199           if (queue->attributes.ordered) {
200             mtapi_affinity_init(
201               &queue->ordered_affinity,
202               MTAPI_FALSE, MTAPI_NULL);
203             mtapi_affinity_set(
204               &queue->ordered_affinity,
205               queue->handle.id % node->scheduler->worker_count,
206               MTAPI_TRUE, MTAPI_NULL);
207           }
208           queue->queue_id = queue_id;
209           queue_hndl = queue->handle;
210         } else {
211           embb_mtapi_queue_pool_deallocate(node->queue_pool, queue);
212           local_status = MTAPI_ERR_JOB_INVALID;
213         }
214       } else {
215         embb_mtapi_queue_pool_deallocate(node->queue_pool, queue);
216       }
217     } else {
218       local_status = MTAPI_ERR_QUEUE_LIMIT;
219     }
220   } else {
221     local_status = MTAPI_ERR_NODE_NOTINIT;
222   }
223 
224   mtapi_status_set(status, local_status);
225   return queue_hndl;
226 }
227 
mtapi_queue_set_attribute(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_IN void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)228 void mtapi_queue_set_attribute(
229   MTAPI_IN mtapi_queue_hndl_t queue,
230   MTAPI_IN mtapi_uint_t attribute_num,
231   MTAPI_IN void* attribute,
232   MTAPI_IN mtapi_size_t attribute_size,
233   MTAPI_OUT mtapi_status_t* status) {
234   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
235   embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
236   embb_mtapi_queue_t* local_queue;
237 
238   embb_mtapi_log_trace("mtapi_queue_set_attribute() called\n");
239 
240   if (embb_mtapi_node_is_initialized()) {
241     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
242       local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
243         node->queue_pool, queue);
244       mtapi_queueattr_set(&local_queue->attributes, attribute_num,
245         attribute, attribute_size, &local_status);
246     } else {
247       local_status = MTAPI_ERR_QUEUE_INVALID;
248     }
249   } else {
250     local_status = MTAPI_ERR_NODE_NOTINIT;
251   }
252 
253   mtapi_status_set(status, local_status);
254 }
255 
mtapi_queue_get_attribute(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_OUT void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)256 void mtapi_queue_get_attribute(
257   MTAPI_IN mtapi_queue_hndl_t queue,
258   MTAPI_IN mtapi_uint_t attribute_num,
259   MTAPI_OUT void* attribute,
260   MTAPI_IN mtapi_size_t attribute_size,
261   MTAPI_OUT mtapi_status_t* status
262   ) {
263   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
264   embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
265   embb_mtapi_queue_t* local_queue;
266 
267   embb_mtapi_log_trace("mtapi_queue_get_attribute() called\n");
268 
269   if (embb_mtapi_node_is_initialized()) {
270     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
271       local_queue = embb_mtapi_queue_pool_get_storage_for_handle(
272         node->queue_pool, queue);
273 
274       if (MTAPI_NULL == attribute) {
275         local_status = MTAPI_ERR_PARAMETER;
276       } else {
277         switch (attribute_num) {
278         case MTAPI_QUEUE_GLOBAL:
279           local_status = embb_mtapi_attr_get_mtapi_boolean_t(
280             &local_queue->attributes.global, attribute, attribute_size);
281           break;
282 
283         case MTAPI_QUEUE_PRIORITY:
284           local_status = embb_mtapi_attr_get_mtapi_uint_t(
285             &local_queue->attributes.priority, attribute, attribute_size);
286           break;
287 
288         case MTAPI_QUEUE_LIMIT:
289           local_status = embb_mtapi_attr_get_mtapi_uint_t(
290             &local_queue->attributes.limit, attribute, attribute_size);
291           break;
292 
293         case MTAPI_QUEUE_ORDERED:
294           local_status = embb_mtapi_attr_get_mtapi_boolean_t(
295             &local_queue->attributes.ordered, attribute, attribute_size);
296           break;
297 
298         case MTAPI_QUEUE_RETAIN:
299           local_status = embb_mtapi_attr_get_mtapi_boolean_t(
300             &local_queue->attributes.retain, attribute, attribute_size);
301           break;
302 
303         case MTAPI_QUEUE_DOMAIN_SHARED:
304           local_status = embb_mtapi_attr_get_mtapi_boolean_t(
305             &local_queue->attributes.domain_shared, attribute, attribute_size);
306           break;
307 
308         default:
309           local_status = MTAPI_ERR_ATTR_NUM;
310           break;
311         }
312       }
313     } else {
314       local_status = MTAPI_ERR_QUEUE_INVALID;
315     }
316   } else {
317     local_status = MTAPI_ERR_NODE_NOTINIT;
318   }
319 
320   mtapi_status_set(status, local_status);
321 }
322 
mtapi_queue_get(MTAPI_IN mtapi_queue_id_t queue_id,MTAPI_IN mtapi_domain_t domain_id,MTAPI_OUT mtapi_status_t * status)323 mtapi_queue_hndl_t mtapi_queue_get(
324   MTAPI_IN mtapi_queue_id_t queue_id,
325   MTAPI_IN mtapi_domain_t domain_id,
326   MTAPI_OUT mtapi_status_t* status) {
327   mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
328   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
329 
330   EMBB_UNUSED(domain_id);
331 
332   embb_mtapi_log_trace("mtapi_queue_get() called\n");
333 
334   if (embb_mtapi_node_is_initialized()) {
335     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
336     mtapi_uint_t ii;
337 
338     local_status = MTAPI_ERR_QUEUE_INVALID;
339     for (ii = 0; ii < node->attributes.max_queues; ii++) {
340       if (queue_id == node->queue_pool->storage[ii].queue_id) {
341         queue_hndl = node->queue_pool->storage[ii].handle;
342         local_status = MTAPI_SUCCESS;
343         break;
344       }
345     }
346   } else {
347     local_status = MTAPI_ERR_NODE_NOTINIT;
348   }
349 
350   mtapi_status_set(status, local_status);
351   return queue_hndl;
352 }
353 
mtapi_queue_delete(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)354 void mtapi_queue_delete(
355   MTAPI_IN mtapi_queue_hndl_t queue,
356   MTAPI_IN mtapi_timeout_t timeout,
357   MTAPI_OUT mtapi_status_t* status) {
358   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
359 
360   embb_mtapi_log_trace("mtapi_queue_delete() called\n");
361 
362   if (embb_mtapi_node_is_initialized()) {
363     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
364     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
365       embb_mtapi_queue_t* local_queue =
366         embb_mtapi_queue_pool_get_storage_for_handle(
367           node->queue_pool, queue);
368       embb_mtapi_thread_context_t * context = NULL;
369 
370       embb_duration_t wait_duration;
371       embb_time_t start_time;
372       embb_time_t end_time;
373       if (MTAPI_INFINITE < timeout) {
374         embb_duration_set_milliseconds(
375           &wait_duration, (unsigned long long)timeout);
376         embb_time_now(&start_time);
377         embb_time_in(&end_time, &wait_duration);
378       }
379 
380       /* find out on which thread we are */
381       context = embb_mtapi_scheduler_get_current_thread_context(
382         node->scheduler);
383 
384       if (local_queue->attributes.retain) {
385         /* reschedule retained tasks */
386         embb_mtapi_task_t * task =
387           embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
388         while (MTAPI_NULL != task) {
389           embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
390           embb_mtapi_scheduler_schedule_task(node->scheduler, task);
391           task = embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
392         }
393       }
394 
395       /* cancel all tasks */
396       embb_mtapi_scheduler_process_tasks(
397         node->scheduler, embb_mtapi_queue_delete_visitor, local_queue);
398 
399       /* wait for tasks in queue to finish */
400       local_status = MTAPI_SUCCESS;
401       while (0 != embb_atomic_load_int(&local_queue->num_tasks)) {
402         if (MTAPI_INFINITE < timeout) {
403           embb_time_t current_time;
404           embb_time_now(&current_time);
405           if (embb_time_compare(&current_time, &start_time) < 0) {
406             /* time has moved backwards, maybe a wraparound or jitter
407                move end_time backward to avoid endeless loop */
408             start_time = current_time;
409             embb_time_in(&end_time, &wait_duration);
410           }
411           if (embb_time_compare(&current_time, &end_time) > 0) {
412             /* timeout! */
413             local_status = MTAPI_TIMEOUT;
414             break;
415           }
416         }
417 
418         /* do other work if applicable */
419         embb_mtapi_scheduler_execute_task_or_yield(
420           node->scheduler,
421           node,
422           context);
423       }
424 
425       if (MTAPI_SUCCESS == local_status) {
426         /* delete queue */
427         embb_mtapi_queue_finalize(local_queue);
428         embb_mtapi_queue_pool_deallocate(node->queue_pool, local_queue);
429       }
430     } else {
431       local_status = MTAPI_ERR_QUEUE_INVALID;
432     }
433   } else {
434     local_status = MTAPI_ERR_NODE_NOTINIT;
435   }
436 
437   mtapi_status_set(status, local_status);
438 }
439 
mtapi_queue_disable(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)440 void mtapi_queue_disable(
441   MTAPI_IN mtapi_queue_hndl_t queue,
442   MTAPI_IN mtapi_timeout_t timeout,
443   MTAPI_OUT mtapi_status_t* status) {
444   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
445 
446   embb_mtapi_log_trace("mtapi_queue_disable() called\n");
447 
448   if (embb_mtapi_node_is_initialized()) {
449     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
450     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
451       embb_mtapi_queue_t* local_queue =
452         embb_mtapi_queue_pool_get_storage_for_handle(
453           node->queue_pool, queue);
454       embb_atomic_store_char(&local_queue->enabled, MTAPI_FALSE);
455 
456       /* cancel or retain all tasks scheduled via queue */
457       embb_mtapi_scheduler_process_tasks(
458         node->scheduler, embb_mtapi_queue_disable_visitor, local_queue);
459 
460       /* cancel all tasks held back the ordered queue if it is not retaining */
461       if (local_queue->attributes.ordered && !local_queue->attributes.retain) {
462         embb_mtapi_action_t * local_action = MTAPI_NULL;
463         embb_mtapi_task_t * task =
464           embb_mtapi_task_queue_pop_front(&local_queue->ordered_tasks);
465         while (MTAPI_NULL != task) {
466           /* get action for task */
467           if (embb_mtapi_action_pool_is_handle_valid(
468             node->action_pool, task->action)) {
469             local_action =
470               embb_mtapi_action_pool_get_storage_for_handle(
471                 node->action_pool, task->action);
472           }
473           /* set state to cancelled */
474           task->error_code = MTAPI_ERR_ACTION_CANCELLED;
475           embb_mtapi_scheduler_finalize_task(task, node, MTAPI_TASK_CANCELLED);
476           /* remove task from action */
477           if (MTAPI_NULL != local_action) {
478             embb_atomic_fetch_and_add_int(&local_action->num_tasks, -1);
479           }
480           /* get next task */
481           task = embb_mtapi_task_queue_pop_front(&local_queue->ordered_tasks);
482         }
483       }
484 
485       /* if queue is not retaining, wait for all tasks to finish */
486       if (MTAPI_FALSE == local_queue->attributes.retain) {
487         /* find out on which thread we are */
488         embb_mtapi_thread_context_t * context =
489           embb_mtapi_scheduler_get_current_thread_context(node->scheduler);
490 
491         embb_duration_t wait_duration;
492         embb_time_t start_time;
493         embb_time_t end_time;
494         if (MTAPI_INFINITE < timeout) {
495           embb_duration_set_milliseconds(
496             &wait_duration, (unsigned long long)timeout);
497           embb_time_now(&start_time);
498           embb_time_in(&end_time, &wait_duration);
499         }
500 
501         /* wait for tasks in queue to finish */
502         local_status = MTAPI_SUCCESS;
503         while (0 != embb_atomic_load_int(&local_queue->num_tasks)) {
504           if (MTAPI_INFINITE < timeout) {
505             embb_time_t current_time;
506             embb_time_now(&current_time);
507             if (embb_time_compare(&current_time, &start_time) < 0) {
508               /* time has moved backwards, maybe a wraparound or jitter
509                  move end_time backward to avoid endeless loop */
510               start_time = current_time;
511               embb_time_in(&end_time, &wait_duration);
512             }
513             if (embb_time_compare(&current_time, &end_time) > 0) {
514               /* timeout! */
515               local_status = MTAPI_TIMEOUT;
516               break;
517             }
518           }
519 
520           /* do other work if applicable */
521           embb_mtapi_scheduler_execute_task_or_yield(
522             node->scheduler,
523             node,
524             context);
525         }
526       } else {
527         local_status = MTAPI_SUCCESS;
528       }
529     } else {
530       local_status = MTAPI_ERR_QUEUE_INVALID;
531     }
532   } else {
533     local_status = MTAPI_ERR_NODE_NOTINIT;
534   }
535 
536   mtapi_status_set(status, local_status);
537 }
538 
mtapi_queue_enable(MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_OUT mtapi_status_t * status)539 void mtapi_queue_enable(
540   MTAPI_IN mtapi_queue_hndl_t queue,
541   MTAPI_OUT mtapi_status_t* status) {
542   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
543 
544   embb_mtapi_log_trace("mtapi_queue_enable() called\n");
545 
546   if (embb_mtapi_node_is_initialized()) {
547     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
548     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
549       embb_mtapi_queue_t* local_queue =
550         embb_mtapi_queue_pool_get_storage_for_handle(
551           node->queue_pool, queue);
552       embb_atomic_store_char(&local_queue->enabled, MTAPI_TRUE);
553       local_status = MTAPI_SUCCESS;
554       if (local_queue->attributes.retain) {
555         /* reschedule retained tasks */
556         embb_mtapi_task_t * task =
557           embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
558         while (MTAPI_NULL != task) {
559           embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
560           embb_mtapi_scheduler_schedule_task(node->scheduler, task);
561           task = embb_mtapi_task_queue_pop_front(&local_queue->retained_tasks);
562         }
563       }
564     } else {
565       local_status = MTAPI_ERR_QUEUE_INVALID;
566     }
567   } else {
568     local_status = MTAPI_ERR_NODE_NOTINIT;
569   }
570 
571   mtapi_status_set(status, local_status);
572 }
573