1 /*
2 * Copyright (c) 2014-2017, Siemens AG. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice,
8 * this list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
15 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
18 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
19 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
20 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
21 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
22 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
23 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
24 * POSSIBILITY OF SUCH DAMAGE.
25 */
26
27 #include <assert.h>
28
29 #include <embb/mtapi/c/mtapi.h>
30
31 #include <embb_mtapi_log.h>
32 #include <mtapi_status_t.h>
33 #include <embb_mtapi_node_t.h>
34 #include <embb_mtapi_task_t.h>
35 #include <embb_mtapi_job_t.h>
36 #include <embb_mtapi_queue_t.h>
37 #include <embb_mtapi_task_queue_t.h>
38 #include <embb_mtapi_thread_context_t.h>
39 #include <embb_mtapi_action_t.h>
40 #include <embb_mtapi_group_t.h>
41 #include <embb_mtapi_scheduler_t.h>
42 #include <embb_mtapi_attr.h>
43 #include <embb_mtapi_task_context_t.h>
44
45
46 /* ---- POOL STORAGE FUNCTIONS --------------------------------------------- */
47
48 #include <embb_mtapi_pool_template-inl.h>
embb_mtapi_pool_implementation(task)49 embb_mtapi_pool_implementation(task)
50
51
52 /* ---- CLASS MEMBERS ------------------------------------------------------ */
53
54 embb_mtapi_task_t* embb_mtapi_task_new(embb_mtapi_task_pool_t* pool) {
55 embb_mtapi_task_t* that;
56
57 assert(MTAPI_NULL != pool);
58
59 that = embb_mtapi_task_pool_allocate(pool);
60 if (MTAPI_NULL != that) {
61 embb_mtapi_task_initialize(that);
62 }
63
64 return that;
65 }
66
embb_mtapi_task_delete(embb_mtapi_task_t * that,embb_mtapi_task_pool_t * pool)67 void embb_mtapi_task_delete(
68 embb_mtapi_task_t* that,
69 embb_mtapi_task_pool_t* pool) {
70 assert(MTAPI_NULL != that);
71 assert(MTAPI_NULL != pool);
72
73 embb_mtapi_task_finalize(that);
74 embb_mtapi_task_pool_deallocate(pool, that);
75 }
76
embb_mtapi_task_initialize(embb_mtapi_task_t * that)77 void embb_mtapi_task_initialize(embb_mtapi_task_t* that) {
78 assert(MTAPI_NULL != that);
79
80 that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
81 that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
82 embb_atomic_init_int(&that->state, MTAPI_TASK_ERROR);
83 that->task_id = MTAPI_TASK_ID_NONE;
84 that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
85 that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
86 that->error_code = MTAPI_SUCCESS;
87 that->next = MTAPI_NULL;
88 embb_atomic_init_unsigned_int(&that->current_instance, 0);
89 embb_atomic_init_unsigned_int(&that->instances_todo, 0);
90 }
91
embb_mtapi_task_finalize(embb_mtapi_task_t * that)92 void embb_mtapi_task_finalize(embb_mtapi_task_t* that) {
93 assert(MTAPI_NULL != that);
94
95 that->action.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
96 that->job.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
97 embb_atomic_destroy_int(&that->state);
98 that->task_id = MTAPI_TASK_ID_NONE;
99 that->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
100 that->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
101 that->error_code = MTAPI_SUCCESS;
102 that->next = MTAPI_NULL;
103 embb_atomic_destroy_unsigned_int(&that->current_instance);
104 embb_atomic_destroy_unsigned_int(&that->instances_todo);
105 }
106
embb_mtapi_task_execute(embb_mtapi_task_t * that,embb_mtapi_task_context_t * context,mtapi_task_state_t * new_task_state)107 mtapi_boolean_t embb_mtapi_task_execute(
108 embb_mtapi_task_t* that,
109 embb_mtapi_task_context_t * context,
110 mtapi_task_state_t * new_task_state) {
111 unsigned int todo = that->attributes.num_instances;
112
113 assert(MTAPI_NULL != that);
114 assert(MTAPI_NULL != context);
115
116 embb_mtapi_task_set_state(that, MTAPI_TASK_RUNNING);
117
118 /* is the associated action valid? */
119 if (embb_mtapi_action_pool_is_handle_valid(
120 context->thread_context->node->action_pool, that->action)) {
121 /* fetch action and execute */
122 embb_mtapi_action_t* local_action =
123 embb_mtapi_action_pool_get_storage_for_handle(
124 context->thread_context->node->action_pool, that->action);
125
126 /* only continue if there was no error so far */
127 if (that->error_code == MTAPI_SUCCESS) {
128 local_action->action_function(
129 that->arguments,
130 that->arguments_size,
131 that->result_buffer,
132 that->result_size,
133 local_action->node_local_data,
134 local_action->node_local_data_size,
135 context);
136 }
137 embb_atomic_memory_barrier();
138 todo = embb_atomic_fetch_and_add_unsigned_int(
139 &that->instances_todo, (unsigned int)-1);
140
141 if (todo == 1) {
142 /* task has completed */
143 if (MTAPI_SUCCESS == that->error_code) {
144 *new_task_state = MTAPI_TASK_COMPLETED;
145 } else if (MTAPI_ERR_ACTION_CANCELLED == that->error_code) {
146 *new_task_state = MTAPI_TASK_CANCELLED;
147 } else {
148 *new_task_state = MTAPI_TASK_ERROR;
149 }
150 }
151 } else {
152 /* action was deleted, task did not complete */
153 that->error_code = MTAPI_ERR_ACTION_DELETED;
154 *new_task_state = MTAPI_TASK_ERROR;
155 todo = 1;
156 }
157
158 if (todo == 1) {
159 return MTAPI_TRUE;
160 } else {
161 return MTAPI_FALSE;
162 }
163 }
164
embb_mtapi_task_set_state(embb_mtapi_task_t * that,mtapi_task_state_t state)165 void embb_mtapi_task_set_state(
166 embb_mtapi_task_t* that,
167 mtapi_task_state_t state) {
168 assert(MTAPI_NULL != that);
169
170 embb_atomic_store_int(&that->state, state);
171 }
172
embb_mtapi_task_start(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_OUT mtapi_status_t * status)173 static mtapi_task_hndl_t embb_mtapi_task_start(
174 MTAPI_IN mtapi_task_id_t task_id,
175 MTAPI_IN mtapi_job_hndl_t job,
176 MTAPI_IN void* arguments,
177 MTAPI_IN mtapi_size_t arguments_size,
178 MTAPI_OUT void* result_buffer,
179 MTAPI_IN mtapi_size_t result_size,
180 MTAPI_IN mtapi_task_attributes_t* attributes,
181 MTAPI_IN mtapi_group_hndl_t group,
182 MTAPI_IN mtapi_queue_hndl_t queue,
183 MTAPI_OUT mtapi_status_t* status) {
184 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
185 mtapi_task_hndl_t task_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
186
187 embb_mtapi_log_trace("embb_mtapi_task_start() called\n");
188
189 if (embb_mtapi_node_is_initialized()) {
190 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
191 if (embb_mtapi_job_is_handle_valid(node, job)) {
192 embb_mtapi_job_t* local_job =
193 embb_mtapi_job_get_storage_for_id(node, job.id);
194 embb_mtapi_task_t* task = embb_mtapi_task_pool_allocate(node->task_pool);
195 if (MTAPI_NULL != task) {
196 mtapi_uint_t action_index;
197
198 embb_mtapi_task_initialize(task);
199 embb_mtapi_task_set_state(task, MTAPI_TASK_PRENATAL);
200 task->task_id = task_id;
201 task->job = job;
202 task->arguments = arguments;
203 task->arguments_size = arguments_size;
204 task->result_buffer = result_buffer;
205 task->result_size = result_size;
206
207 if (MTAPI_NULL != attributes) {
208 task->attributes = *attributes;
209 } else {
210 mtapi_taskattr_init(&task->attributes, &local_status);
211 }
212
213 embb_atomic_store_unsigned_int(
214 &task->instances_todo, task->attributes.num_instances);
215
216 if (embb_mtapi_group_pool_is_handle_valid(node->group_pool, group)) {
217 embb_mtapi_group_t* local_group =
218 embb_mtapi_group_pool_get_storage_for_handle(
219 node->group_pool, group);
220 task->group = group;
221 embb_atomic_fetch_and_add_int(&local_group->num_tasks, 1);
222 } else {
223 task->group.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
224 }
225
226 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
227 embb_mtapi_queue_t* local_queue =
228 embb_mtapi_queue_pool_get_storage_for_handle(
229 node->queue_pool, queue);
230 task->queue = queue;
231 embb_mtapi_queue_task_started(local_queue);
232 } else {
233 task->queue.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
234 }
235
236 /* load balancing: choose action with minimum tasks */
237 action_index = 0;
238 for (mtapi_uint_t ii = 0; ii < local_job->num_actions; ii++) {
239 if (embb_mtapi_action_pool_is_handle_valid(
240 node->action_pool, local_job->actions[ii])) {
241 embb_mtapi_action_t * act_m =
242 embb_mtapi_action_pool_get_storage_for_handle(
243 node->action_pool, local_job->actions[action_index]);
244 embb_mtapi_action_t * act_i =
245 embb_mtapi_action_pool_get_storage_for_handle(
246 node->action_pool, local_job->actions[ii]);
247 if (embb_atomic_load_int(&act_m->num_tasks) >
248 embb_atomic_load_int(&act_i->num_tasks)) {
249 action_index = ii;
250 }
251 }
252 }
253 if (embb_mtapi_action_pool_is_handle_valid(
254 node->action_pool, local_job->actions[action_index])) {
255 task->action = local_job->actions[action_index];
256 embb_mtapi_task_set_state(task, MTAPI_TASK_CREATED);
257 task_hndl = task->handle;
258 local_status = MTAPI_SUCCESS;
259 } else {
260 local_status = MTAPI_ERR_ACTION_INVALID;
261 }
262
263 /* check priority for validity */
264 if (node->attributes.max_priorities <= task->attributes.priority) {
265 local_status = MTAPI_ERR_PARAMETER;
266 }
267
268 if (MTAPI_SUCCESS == local_status) {
269 embb_mtapi_scheduler_t * scheduler = node->scheduler;
270 mtapi_boolean_t was_scheduled;
271 embb_mtapi_action_t * local_action =
272 embb_mtapi_action_pool_get_storage_for_handle(
273 node->action_pool, task->action);
274 int num_instances = (int)task->attributes.num_instances;
275
276 /* num_instances more tasks in flight for action */
277 embb_atomic_fetch_and_add_int(
278 &local_action->num_tasks, num_instances);
279
280 embb_mtapi_task_set_state(task, MTAPI_TASK_SCHEDULED);
281
282 if (local_action->is_plugin_action) {
283 /* schedule plugin task */
284 mtapi_status_t plugin_status = MTAPI_ERR_UNKNOWN;
285 local_action->plugin_task_start_function(
286 task_hndl, &plugin_status);
287 was_scheduled = (MTAPI_SUCCESS == plugin_status) ?
288 MTAPI_TRUE : MTAPI_FALSE;
289 } else {
290 /* schedule local task */
291 was_scheduled =
292 embb_mtapi_scheduler_schedule_task(scheduler, task);
293 }
294
295 if (was_scheduled) {
296 /* if task is detached, do not return a handle, it will be deleted
297 on completion */
298 if (task->attributes.is_detached) {
299 task_hndl.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
300 }
301
302 local_status = MTAPI_SUCCESS;
303 } else {
304 /* task could not be pushed */
305 local_status = MTAPI_ERR_TASK_LIMIT;
306 embb_mtapi_task_set_state(task, MTAPI_TASK_ERROR);
307 /* num_instances tasks not in flight for action */
308 embb_atomic_fetch_and_add_int(
309 &local_action->num_tasks, -num_instances);
310 }
311 }
312
313 if (MTAPI_SUCCESS != local_status) {
314 embb_mtapi_task_delete(task, node->task_pool);
315 task_hndl.id = EMBB_MTAPI_IDPOOL_INVALID_ID;
316 }
317 } else {
318 local_status = MTAPI_ERR_TASK_LIMIT;
319 }
320 } else {
321 local_status = MTAPI_ERR_JOB_INVALID;
322 }
323 } else {
324 local_status = MTAPI_ERR_NODE_NOTINIT;
325 }
326
327 mtapi_status_set(status, local_status);
328 return task_hndl;
329 }
330
331
332 /* ---- INTERFACE FUNCTIONS ------------------------------------------------ */
333
mtapi_task_start(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_job_hndl_t job,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_OUT mtapi_status_t * status)334 mtapi_task_hndl_t mtapi_task_start(
335 MTAPI_IN mtapi_task_id_t task_id,
336 MTAPI_IN mtapi_job_hndl_t job,
337 MTAPI_IN void* arguments,
338 MTAPI_IN mtapi_size_t arguments_size,
339 MTAPI_OUT void* result_buffer, /* pointer to result buffer */
340 MTAPI_IN mtapi_size_t result_size, /* size of one result */
341 MTAPI_IN mtapi_task_attributes_t* attributes,
342 MTAPI_IN mtapi_group_hndl_t group,
343 MTAPI_OUT mtapi_status_t* status) {
344 mtapi_queue_hndl_t queue_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
345
346 embb_mtapi_log_trace("mtapi_task_start() called\n");
347
348 return embb_mtapi_task_start(
349 task_id,
350 job,
351 arguments,
352 arguments_size,
353 result_buffer,
354 result_size,
355 attributes,
356 group,
357 queue_hndl,
358 status);
359 }
360
mtapi_task_enqueue(MTAPI_IN mtapi_task_id_t task_id,MTAPI_IN mtapi_queue_hndl_t queue,MTAPI_IN void * arguments,MTAPI_IN mtapi_size_t arguments_size,MTAPI_OUT void * result_buffer,MTAPI_IN mtapi_size_t result_size,MTAPI_IN mtapi_task_attributes_t * attributes,MTAPI_IN mtapi_group_hndl_t group,MTAPI_OUT mtapi_status_t * status)361 mtapi_task_hndl_t mtapi_task_enqueue(
362 MTAPI_IN mtapi_task_id_t task_id,
363 MTAPI_IN mtapi_queue_hndl_t queue,
364 MTAPI_IN void* arguments,
365 MTAPI_IN mtapi_size_t arguments_size,
366 MTAPI_OUT void* result_buffer, /* pointer to result buffer */
367 MTAPI_IN mtapi_size_t result_size, /* size of one result */
368 MTAPI_IN mtapi_task_attributes_t* attributes,
369 MTAPI_IN mtapi_group_hndl_t group,
370 MTAPI_OUT mtapi_status_t* status) {
371 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
372 mtapi_task_hndl_t task_hndl = { 0, EMBB_MTAPI_IDPOOL_INVALID_ID };
373
374 embb_mtapi_log_trace("mtapi_task_enqueue() called\n");
375
376 if (embb_mtapi_node_is_initialized()) {
377 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
378 if (embb_mtapi_queue_pool_is_handle_valid(node->queue_pool, queue)) {
379 embb_mtapi_queue_t* local_queue =
380 embb_mtapi_queue_pool_get_storage_for_handle(node->queue_pool, queue);
381 if ((MTAPI_TRUE == embb_atomic_load_char(&local_queue->enabled)) ||
382 local_queue->attributes.retain) {
383 mtapi_task_attributes_t local_attributes;
384 if (MTAPI_NULL != attributes) {
385 local_attributes = *attributes;
386 } else {
387 mtapi_taskattr_init(&local_attributes, MTAPI_NULL);
388 }
389 local_attributes.priority = local_queue->attributes.priority;
390
391 task_hndl = embb_mtapi_task_start(
392 task_id,
393 local_queue->job_handle,
394 arguments,
395 arguments_size,
396 result_buffer,
397 result_size,
398 &local_attributes,
399 group,
400 queue,
401 &local_status);
402 } else {
403 local_status = MTAPI_ERR_QUEUE_DISABLED;
404 }
405 } else {
406 local_status = MTAPI_ERR_QUEUE_INVALID;
407 }
408 } else {
409 local_status = MTAPI_ERR_NODE_NOTINIT;
410 }
411
412 mtapi_status_set(status, local_status);
413 return task_hndl;
414 }
415
mtapi_task_get_attribute(MTAPI_IN mtapi_task_hndl_t task,MTAPI_IN mtapi_uint_t attribute_num,MTAPI_OUT void * attribute,MTAPI_IN mtapi_size_t attribute_size,MTAPI_OUT mtapi_status_t * status)416 void mtapi_task_get_attribute(
417 MTAPI_IN mtapi_task_hndl_t task,
418 MTAPI_IN mtapi_uint_t attribute_num,
419 MTAPI_OUT void* attribute,
420 MTAPI_IN mtapi_size_t attribute_size,
421 MTAPI_OUT mtapi_status_t* status) {
422 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
423
424 embb_mtapi_log_trace("mtapi_task_get_attribute() called\n");
425
426 if (embb_mtapi_node_is_initialized()) {
427 embb_mtapi_node_t * node = embb_mtapi_node_get_instance();
428 if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
429 embb_mtapi_task_t* local_task =
430 embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
431
432 if (MTAPI_NULL == attribute) {
433 local_status = MTAPI_ERR_PARAMETER;
434 } else {
435 switch (attribute_num) {
436 case MTAPI_TASK_DETACHED:
437 local_status = embb_mtapi_attr_get_mtapi_boolean_t(
438 &local_task->attributes.is_detached, attribute, attribute_size);
439 break;
440
441 case MTAPI_TASK_INSTANCES:
442 local_status = embb_mtapi_attr_get_mtapi_uint_t(
443 &local_task->attributes.num_instances, attribute, attribute_size);
444 break;
445
446 case MTAPI_TASK_PRIORITY:
447 local_status = embb_mtapi_attr_get_mtapi_uint_t(
448 &local_task->attributes.priority, attribute, attribute_size);
449 break;
450
451 default:
452 local_status = MTAPI_ERR_ATTR_NUM;
453 break;
454 }
455 }
456 } else {
457 local_status = MTAPI_ERR_QUEUE_INVALID;
458 }
459 } else {
460 local_status = MTAPI_ERR_NODE_NOTINIT;
461 }
462
463 mtapi_status_set(status, local_status);
464 }
465
mtapi_task_wait(MTAPI_IN mtapi_task_hndl_t task,MTAPI_IN mtapi_timeout_t timeout,MTAPI_OUT mtapi_status_t * status)466 void mtapi_task_wait(
467 MTAPI_IN mtapi_task_hndl_t task,
468 MTAPI_IN mtapi_timeout_t timeout,
469 MTAPI_OUT mtapi_status_t* status) {
470 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
471
472 embb_mtapi_log_trace("mtapi_task_wait() called\n");
473
474 if (embb_mtapi_node_is_initialized()) {
475 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
476 if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
477 embb_mtapi_task_t* local_task =
478 embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
479 if (embb_mtapi_scheduler_wait_for_task(local_task, timeout)) {
480 local_status = local_task->error_code;
481
482 /* delete task if it is not in a group, otherwise the group will take
483 care of deletion */
484 if (MTAPI_FALSE == embb_mtapi_group_pool_is_handle_valid(
485 node->group_pool, local_task->group)) {
486 embb_mtapi_task_delete(local_task, node->task_pool);
487 }
488 } else {
489 local_status = MTAPI_TIMEOUT;
490 }
491 } else {
492 local_status = MTAPI_ERR_TASK_INVALID;
493 }
494 } else {
495 local_status = MTAPI_ERR_NODE_NOTINIT;
496 }
497
498 mtapi_status_set(status, local_status);
499 }
500
mtapi_task_cancel(MTAPI_IN mtapi_task_hndl_t task,MTAPI_OUT mtapi_status_t * status)501 void mtapi_task_cancel(
502 MTAPI_IN mtapi_task_hndl_t task,
503 MTAPI_OUT mtapi_status_t* status) {
504 mtapi_status_t local_status = MTAPI_ERR_UNKNOWN;
505
506 embb_mtapi_log_trace("mtapi_task_cancel() called\n");
507
508 if (embb_mtapi_node_is_initialized()) {
509 embb_mtapi_node_t* node = embb_mtapi_node_get_instance();
510 if (embb_mtapi_task_pool_is_handle_valid(node->task_pool, task)) {
511 embb_mtapi_task_t* local_task =
512 embb_mtapi_task_pool_get_storage_for_handle(node->task_pool, task);
513
514 /* call plugin action cancel function */
515 if (embb_mtapi_action_pool_is_handle_valid(
516 node->action_pool, local_task->action)) {
517 embb_mtapi_action_t* local_action =
518 embb_mtapi_action_pool_get_storage_for_handle(
519 node->action_pool, local_task->action);
520 if (local_action->is_plugin_action) {
521 local_action->plugin_task_cancel_function(task, &local_status);
522 } else {
523 local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
524 embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
525 local_status = MTAPI_SUCCESS;
526 }
527 } else {
528 local_task->error_code = MTAPI_ERR_ACTION_CANCELLED;
529 embb_mtapi_task_set_state(local_task, MTAPI_TASK_CANCELLED);
530 local_status = MTAPI_SUCCESS;
531 }
532 } else {
533 local_status = MTAPI_ERR_TASK_INVALID;
534 }
535 } else {
536 local_status = MTAPI_ERR_NODE_NOTINIT;
537 }
538
539 mtapi_status_set(status, local_status);
540 }
541