1 /*
2  * Copyright (c) 2014-2017, Siemens AG. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice,
11  * this list of conditions and the following disclaimer in the documentation
12  * and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
18  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24  * POSSIBILITY OF SUCH DAMAGE.
25  */
26 
27 #include <assert.h>
28 
29 #include <embb/mtapi/c/mtapi.h>
30 
31 #include <embb_mtapi_log.h>
32 #include <mtapi_status_t.h>
33 #include <embb_mtapi_node_t.h>
34 #include <embb_mtapi_task_t.h>
35 #include <embb_mtapi_job_t.h>
36 #include <embb_mtapi_queue_t.h>
37 #include <embb_mtapi_task_queue_t.h>
38 #include <embb_mtapi_thread_context_t.h>
39 #include <embb_mtapi_action_t.h>
40 #include <embb_mtapi_group_t.h>
41 #include <embb_mtapi_scheduler_t.h>
42 #include <embb_mtapi_attr.h>
43 #include <embb_mtapi_task_context_t.h>
44 
45 
46 /* ---- POOL STORAGE FUNCTIONS --------------------------------------------- */
47 
48 #include <embb_mtapi_pool_template-inl.h>
embb_mtapi_pool_implementation(task)49 embb_mtapi_pool_implementation(task)
50 
51 
52 /* ---- CLASS MEMBERS ------------------------------------------------------ */
53 
54 embb_mtapi_task_t* embb_mtapi_task_new(embb_mtapi_task_pool_t* pool) {
55   embb_mtapi_task_t* that;
56 
57   assert(MTAPI_NULL != pool);
58 
59   that = embb_mtapi_task_pool_allocate(pool);
60   if (MTAPI_NULL != that) {
61     embb_mtapi_task_initialize(that);
62   }
63 
64   return that;
65 }
66 
embb_mtapi_task_delete(embb_mtapi_task_t * that,embb_mtapi_task_pool_t * pool)67 void embb_mtapi_task_delete(
68   embb_mtapi_task_t* that,
69   embb_mtapi_task_pool_t* pool) {
70   assert(MTAPI_NULL != that);
71   assert(MTAPI_NULL != pool);
72 
73   embb_mtapi_task_finalize(that);
74   embb_mtapi_task_pool_deallocate(pool, that);
75 }
76 
embb_mtapi_task_initialize(embb_mtapi_task_t * that)77 void embb_mtapi_task_initialize(embb_mtapi_task_t* that) {
78   assert(MTAPI_NULL != that);
79 
80   that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
81   that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
82   embb_atomic_init_int(&that->state, MTAPI_TASK_ERROR);
83   that->task_id = MTAPI_TASK_ID_NONE;
84   that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
85   that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
86   that->error_code = MTAPI_SUCCESS;
87   that->next = MTAPI_NULL;
88   embb_atomic_init_unsigned_int(&that->current_instance, 0);
89   embb_atomic_init_unsigned_int(&that->instances_todo, 0);
90 }
91 
embb_mtapi_task_finalize(embb_mtapi_task_t * that)92 void embb_mtapi_task_finalize(embb_mtapi_task_t* that) {
93   assert(MTAPI_NULL != that);
94 
95   that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
96   that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
97   embb_atomic_destroy_int(&that->state);
98   that->task_id = MTAPI_TASK_ID_NONE;
99   that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
100   that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
101   that->error_code = MTAPI_SUCCESS;
102   that->next = MTAPI_NULL;
103   embb_atomic_destroy_unsigned_int(&that->current_instance);
104   embb_atomic_destroy_unsigned_int(&that->instances_todo);
105 }
106 
embb_mtapi_task_execute(embb_mtapi_task_t * that,embb_mtapi_task_context_t * context,mtapi_task_state_t * new_task_state)107 mtapi_boolean_t embb_mtapi_task_execute(
108   embb_mtapi_task_t* that,
109   embb_mtapi_task_context_t * context,
110   mtapi_task_state_t * new_task_state) {
111   unsigned int todo = that->attributes.num_instances;
112 
113   assert(MTAPI_NULL != that);
114   assert(MTAPI_NULL != context);
115 
116   embb_mtapi_task_set_state(that, MTAPI_TASK_RUNNING);
117 
118   /* is the associated action valid? */
119   if (embb_mtapi_action_pool_is_handle_valid(
120     context->thread_context->node->action_pool, that->action)) {
121     /* fetch action and execute */
122     embb_mtapi_action_t* local_action =
123       embb_mtapi_action_pool_get_storage_for_handle(
124       context->thread_context->node->action_pool, that->action);
125 
126     /* only continue if there was no error so far */
127     if (that->error_code == MTAPI_SUCCESS) {
128       local_action->action_function(
129         that->arguments,
130         that->arguments_size,
131         that->result_buffer,
132         that->result_size,
133         local_action->node_local_data,
134         local_action->node_local_data_size,
135         context);
136     }
137     embb_atomic_memory_barrier();
138     todo = embb_atomic_fetch_and_add_unsigned_int(
139       &that->instances_todo, (unsigned int)-1);
140 
141     if (todo == 1) {
142       /* task has completed */
143       if (MTAPI_SUCCESS == that->error_code) {
144         *new_task_state = MTAPI_TASK_COMPLETED;
145       } else if (MTAPI_ERR_ACTION_CANCELLED == that->error_code) {
146         *new_task_state = MTAPI_TASK_CANCELLED;
147       } else {
148         *new_task_state = MTAPI_TASK_ERROR;
149       }
150     }
151   } else {
152     /* action was deleted, task did not complete */
153     that->error_code = MTAPI_ERR_ACTION_DELETED;
154     *new_task_state = MTAPI_TASK_ERROR;
155     todo = 1;
156   }
157 
158   if (todo == 1) {
159     return MTAPI_TRUE;
160   } else {
161     return MTAPI_FALSE;
162   }
163 }
164 
embb_mtapi_task_set_state(embb_mtapi_task_t * that,mtapi_task_state_t state)165 void embb_mtapi_task_set_state(
166   embb_mtapi_task_t* that,
167   mtapi_task_state_t state) {
168   assert(MTAPI_NULL != that);
169 
170   embb_atomic_store_int(&that->state, state);
171 }
172 
embb_mtapi_task_start(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_OUT mtapi_status_t * status)173 static mtapi_task_hndl_t embb_mtapi_task_start(
174   MTAPI_IN mtapi_task_id_t task_id,
175   MTAPI_IN mtapi_job_hndl_t job,
176   MTAPI_IN void* arguments,
177   MTAPI_IN mtapi_size_t arguments_size,
178   MTAPI_OUT void* result_buffer,
179   MTAPI_IN mtapi_size_t result_size,
180   MTAPI_IN mtapi_task_attributes_t* attributes,
181   MTAPI_IN mtapi_group_hndl_t group,
182   MTAPI_IN mtapi_queue_hndl_t queue,
183   MTAPI_OUT mtapi_status_t* status) {
184   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
185   mtapi_task_hndl_t task_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
186 
187   embb_mtapi_log_trace("embb_mtapi_task_start() called\n");
188 
189   if (embb_mtapi_node_is_initialized()) {
190     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
191     if (embb_mtapi_job_is_handle_valid(node, job)) {
192       embb_mtapi_job_t* local_job =
193         embb_mtapi_job_get_storage_for_id(node, job.id);
194       embb_mtapi_task_t* task = embb_mtapi_task_pool_allocate(node->task_pool);
195       if (MTAPI_NULL != task) {
196         mtapi_uint_t action_index;
197 
198         embb_mtapi_task_initialize(task);
199         embb_mtapi_task_set_state(task, MTAPI_TASK_PRENATAL);
200         task->task_id = task_id;
201         task->job = job;
202         task->arguments = arguments;
203         task->arguments_size = arguments_size;
204         task->result_buffer = result_buffer;
205         task->result_size = result_size;
206 
207         if (MTAPI_NULL != attributes) {
208           task->attributes = *attributes;
209         } else {
210           mtapi_taskattr_init(&task->attributes, &local_status);
211         }
212 
213         embb_atomic_store_unsigned_int(
214           &task->instances_todo, task->attributes.num_instances);
215 
216         if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) {
217           embb_mtapi_group_t* local_group =
218             embb_mtapi_group_pool_get_storage_for_handle(
219             node->group_pool, group);
220           task->group = group;
221           embb_atomic_fetch_and_add_int(&local_group->num_tasks, 1);
222         } else {
223           task->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
224         }
225 
226         if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
227           embb_mtapi_queue_t* local_queue =
228             embb_mtapi_queue_pool_get_storage_for_handle(
229             node->queue_pool, queue);
230           task->queue = queue;
231           embb_mtapi_queue_task_started(local_queue);
232         } else {
233           task->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
234         }
235 
236         /* load balancing: choose action with minimum tasks */
237         action_index = 0;
238         for (mtapi_uint_t ii = 0; ii < local_job->num_actions; ii++) {
239           if (embb_mtapi_action_pool_is_handle_valid(
240             node->action_pool, local_job->actions[ii])) {
241             embb_mtapi_action_t * act_m =
242               embb_mtapi_action_pool_get_storage_for_handle(
243               node->action_pool, local_job->actions[action_index]);
244             embb_mtapi_action_t * act_i =
245               embb_mtapi_action_pool_get_storage_for_handle(
246               node->action_pool, local_job->actions[ii]);
247             if (embb_atomic_load_int(&act_m->num_tasks) >
248               embb_atomic_load_int(&act_i->num_tasks)) {
249               action_index = ii;
250             }
251           }
252         }
253         if (embb_mtapi_action_pool_is_handle_valid(
254           node->action_pool, local_job->actions[action_index])) {
255           task->action = local_job->actions[action_index];
256           embb_mtapi_task_set_state(task, MTAPI_TASK_CREATED);
257           task_hndl = task->handle;
258           local_status = MTAPI_SUCCESS;
259         } else {
260           local_status = MTAPI_ERR_ACTION_INVALID;
261         }
262 
263         /* check priority for validity */
264         if (node->attributes.max_priorities <= task->attributes.priority) {
265           local_status = MTAPI_ERR_PARAMETER;
266         }
267 
268         if (MTAPI_SUCCESS == local_status) {
269           embb_mtapi_scheduler_t * scheduler = node->scheduler;
270           mtapi_boolean_t was_scheduled;
271           embb_mtapi_action_t * local_action =
272             embb_mtapi_action_pool_get_storage_for_handle(
273               node->action_pool, task->action);
274           int num_instances = (int)task->attributes.num_instances;
275 
276           /* num_instances more tasks in flight for action */
277           embb_atomic_fetch_and_add_int(
278             &local_action->num_tasks, num_instances);
279 
280           embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
281 
282           if (local_action->is_plugin_action) {
283             /* schedule plugin task */
284             mtapi_status_t plugin_status = MTAPI_ERR_UNKNOWN;
285             local_action->plugin_task_start_function(
286               task_hndl, &plugin_status);
287             was_scheduled = (MTAPI_SUCCESS == plugin_status) ?
288               MTAPI_TRUE : MTAPI_FALSE;
289           } else {
290             /* schedule local task */
291             was_scheduled =
292               embb_mtapi_scheduler_schedule_task(scheduler, task);
293           }
294 
295           if (was_scheduled) {
296             /* if task is detached, do not return a handle, it will be deleted
297                on completion */
298             if (task->attributes.is_detached) {
299               task_hndl.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
300             }
301 
302             local_status = MTAPI_SUCCESS;
303           } else {
304             /* task could not be pushed */
305             local_status = MTAPI_ERR_TASK_LIMIT;
306             embb_mtapi_task_set_state(task, MTAPI_TASK_ERROR);
307             /* num_instances tasks not in flight for action */
308             embb_atomic_fetch_and_add_int(
309               &local_action->num_tasks, -num_instances);
310           }
311         }
312 
313         if (MTAPI_SUCCESS != local_status) {
314           embb_mtapi_task_delete(task, node->task_pool);
315           task_hndl.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
316         }
317       } else {
318         local_status = MTAPI_ERR_TASK_LIMIT;
319       }
320     } else {
321       local_status = MTAPI_ERR_JOB_INVALID;
322     }
323   } else {
324     local_status = MTAPI_ERR_NODE_NOTINIT;
325   }
326 
327   mtapi_status_set(status, local_status);
328   return task_hndl;
329 }
330 
331 
332 /* ---- INTERFACE FUNCTIONS ------------------------------------------------ */
333 
mtapi_task_start(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_OUT mtapi_status_t * status)334 mtapi_task_hndl_t mtapi_task_start(
335   MTAPI_IN mtapi_task_id_t task_id,
336   MTAPI_IN mtapi_job_hndl_t job,
337   MTAPI_IN void* arguments,
338   MTAPI_IN mtapi_size_t arguments_size,
339   MTAPI_OUT void* result_buffer, /* pointer to result buffer */
340   MTAPI_IN mtapi_size_t result_size,   /* size of one result */
341   MTAPI_IN mtapi_task_attributes_t* attributes,
342   MTAPI_IN mtapi_group_hndl_t group,
343   MTAPI_OUT mtapi_status_t* status) {
344   mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
345 
346   embb_mtapi_log_trace("mtapi_task_start() called\n");
347 
348   return embb_mtapi_task_start(
349     task_id,
350     job,
351     arguments,
352     arguments_size,
353     result_buffer,
354     result_size,
355     attributes,
356     group,
357     queue_hndl,
358     status);
359 }
360 
mtapi_task_enqueue(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_OUT mtapi_status_t * status)361 mtapi_task_hndl_t mtapi_task_enqueue(
362   MTAPI_IN mtapi_task_id_t task_id,
363   MTAPI_IN mtapi_queue_hndl_t queue,
364   MTAPI_IN void* arguments,
365   MTAPI_IN mtapi_size_t arguments_size,
366   MTAPI_OUT void* result_buffer, /* pointer to result buffer */
367   MTAPI_IN mtapi_size_t result_size,   /* size of one result */
368   MTAPI_IN mtapi_task_attributes_t* attributes,
369   MTAPI_IN mtapi_group_hndl_t group,
370   MTAPI_OUT mtapi_status_t* status) {
371   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
372   mtapi_task_hndl_t task_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
373 
374   embb_mtapi_log_trace("mtapi_task_enqueue() called\n");
375 
376   if (embb_mtapi_node_is_initialized()) {
377     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
378     if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
379       embb_mtapi_queue_t* local_queue =
380         embb_mtapi_queue_pool_get_storage_for_handle(node->queue_pool, queue);
381       if ((MTAPI_TRUE == embb_atomic_load_char(&local_queue->enabled)) ||
382         local_queue->attributes.retain) {
383         mtapi_task_attributes_t local_attributes;
384         if (MTAPI_NULL != attributes) {
385           local_attributes = *attributes;
386         } else {
387           mtapi_taskattr_init(&local_attributes, MTAPI_NULL);
388         }
389         local_attributes.priority = local_queue->attributes.priority;
390 
391         task_hndl = embb_mtapi_task_start(
392           task_id,
393           local_queue->job_handle,
394           arguments,
395           arguments_size,
396           result_buffer,
397           result_size,
398           &local_attributes,
399           group,
400           queue,
401           &local_status);
402       } else {
403         local_status = MTAPI_ERR_QUEUE_DISABLED;
404       }
405     } else {
406       local_status = MTAPI_ERR_QUEUE_INVALID;
407     }
408   } else {
409     local_status = MTAPI_ERR_NODE_NOTINIT;
410   }
411 
412   mtapi_status_set(status, local_status);
413   return task_hndl;
414 }
415 
mtapi_task_get_attribute(MTAPI_IN mtapi_task_hndl_t task,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_OUT void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)416 void mtapi_task_get_attribute(
417   MTAPI_IN mtapi_task_hndl_t task,
418   MTAPI_IN mtapi_uint_t attribute_num,
419   MTAPI_OUT void* attribute,
420   MTAPI_IN mtapi_size_t attribute_size,
421   MTAPI_OUT mtapi_status_t* status) {
422   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
423 
424   embb_mtapi_log_trace("mtapi_task_get_attribute() called\n");
425 
426   if (embb_mtapi_node_is_initialized()) {
427     embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
428     if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
429       embb_mtapi_task_t* local_task =
430         embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
431 
432       if (MTAPI_NULL == attribute) {
433         local_status = MTAPI_ERR_PARAMETER;
434       } else {
435         switch (attribute_num) {
436         case MTAPI_TASK_DETACHED:
437           local_status = embb_mtapi_attr_get_mtapi_boolean_t(
438             &local_task->attributes.is_detached, attribute, attribute_size);
439           break;
440 
441         case MTAPI_TASK_INSTANCES:
442           local_status = embb_mtapi_attr_get_mtapi_uint_t(
443             &local_task->attributes.num_instances, attribute, attribute_size);
444           break;
445 
446         case MTAPI_TASK_PRIORITY:
447           local_status = embb_mtapi_attr_get_mtapi_uint_t(
448             &local_task->attributes.priority, attribute, attribute_size);
449           break;
450 
451         default:
452           local_status = MTAPI_ERR_ATTR_NUM;
453           break;
454         }
455       }
456     } else {
457       local_status = MTAPI_ERR_QUEUE_INVALID;
458     }
459   } else {
460     local_status = MTAPI_ERR_NODE_NOTINIT;
461   }
462 
463   mtapi_status_set(status, local_status);
464 }
465 
mtapi_task_wait(MTAPI_IN mtapi_task_hndl_t task,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)466 void mtapi_task_wait(
467   MTAPI_IN mtapi_task_hndl_t task,
468   MTAPI_IN mtapi_timeout_t timeout,
469   MTAPI_OUT mtapi_status_t* status) {
470   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
471 
472   embb_mtapi_log_trace("mtapi_task_wait() called\n");
473 
474   if (embb_mtapi_node_is_initialized()) {
475     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
476     if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
477       embb_mtapi_task_t* local_task =
478         embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
479       if (embb_mtapi_scheduler_wait_for_task(local_task, timeout)) {
480         local_status = local_task->error_code;
481 
482         /* delete task if it is not in a group, otherwise the group will take
483         care of deletion */
484         if (MTAPI_FALSE == embb_mtapi_group_pool_is_handle_valid(
485           node->group_pool, local_task->group)) {
486           embb_mtapi_task_delete(local_task, node->task_pool);
487         }
488       } else {
489         local_status = MTAPI_TIMEOUT;
490       }
491     } else {
492       local_status = MTAPI_ERR_TASK_INVALID;
493     }
494   } else {
495     local_status = MTAPI_ERR_NODE_NOTINIT;
496   }
497 
498   mtapi_status_set(status, local_status);
499 }
500 
mtapi_task_cancel(MTAPI_IN mtapi_task_hndl_t task,MTAPI_OUT mtapi_status_t * status)501 void mtapi_task_cancel(
502   MTAPI_IN mtapi_task_hndl_t task,
503   MTAPI_OUT mtapi_status_t* status) {
504   mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
505 
506   embb_mtapi_log_trace("mtapi_task_cancel() called\n");
507 
508   if (embb_mtapi_node_is_initialized()) {
509     embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
510     if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
511       embb_mtapi_task_t* local_task =
512         embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
513 
514       /* call plugin action cancel function */
515       if (embb_mtapi_action_pool_is_handle_valid(
516         node->action_pool, local_task->action)) {
517         embb_mtapi_action_t* local_action =
518           embb_mtapi_action_pool_get_storage_for_handle(
519           node->action_pool, local_task->action);
520         if (local_action->is_plugin_action) {
521           local_action->plugin_task_cancel_function(task, &local_status);
522         } else {
523           local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
524           embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
525           local_status = MTAPI_SUCCESS;
526         }
527       } else {
528         local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
529         embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
530         local_status = MTAPI_SUCCESS;
531       }
532     } else {
533       local_status = MTAPI_ERR_TASK_INVALID;
534     }
535   } else {
536     local_status = MTAPI_ERR_NODE_NOTINIT;
537   }
538 
539   mtapi_status_set(status, local_status);
540 }
541