1 /*
2  Copyright (C) 2015-2017 Alexander Borisov
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 
18  Author: lex.borisov@gmail.com (Alexander Borisov)
19 */
20 
21 #include "mycore/mythread.h"
22 
23 #ifndef MyCORE_BUILD_WITHOUT_THREADS
24 
mythread_create(void)25 mythread_t * mythread_create(void)
26 {
27     return mycore_calloc(1, sizeof(mythread_t));
28 }
29 
mythread_init(mythread_t * mythread,mythread_type_t type,size_t threads_count,size_t id_increase)30 mystatus_t mythread_init(mythread_t *mythread, mythread_type_t type, size_t threads_count, size_t id_increase)
31 {
32     if(threads_count == 0)
33         return MyCORE_STATUS_ERROR;
34 
35     mythread->entries_size   = threads_count;
36     mythread->entries_length = 0;
37     mythread->id_increase    = id_increase;
38     mythread->type           = type;
39 
40     mythread->entries = (mythread_entry_t*)mycore_calloc(mythread->entries_size, sizeof(mythread_entry_t));
41     if(mythread->entries == NULL)
42         return MyCORE_STATUS_ERROR_MEMORY_ALLOCATION;
43 
44     mythread->attr = mythread_thread_attr_init(mythread);
45     if(mythread->attr == NULL)
46         return MyCORE_STATUS_THREAD_ERROR_ATTR_INIT;
47 
48     mythread->timespec = mythread_nanosleep_create(mythread);
49 
50     return MyCORE_STATUS_OK;
51 }
52 
mythread_clean(mythread_t * mythread)53 void mythread_clean(mythread_t *mythread)
54 {
55     mythread_thread_attr_clean(mythread, mythread->attr);
56     mythread_nanosleep_clean(mythread->timespec);
57 
58     mythread->sys_last_error = 0;
59 }
60 
mythread_destroy(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx,bool self_destroy)61 mythread_t * mythread_destroy(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx, bool self_destroy)
62 {
63     if(mythread == NULL)
64         return NULL;
65 
66     if(mythread->entries) {
67         mythread_resume(mythread, MyTHREAD_OPT_QUIT);
68         mythread_quit(mythread, before_join, ctx);
69         mycore_free(mythread->entries);
70     }
71 
72     mythread_thread_attr_destroy(mythread, mythread->attr);
73     mythread_nanosleep_destroy(mythread->timespec);
74 
75     if(self_destroy) {
76         mycore_free(mythread);
77         return NULL;
78     }
79 
80     return mythread;
81 }
82 
myhread_entry_create(mythread_t * mythread,mythread_process_f process_func,mythread_work_f work_func,mythread_thread_opt_t opt)83 mystatus_t myhread_entry_create(mythread_t *mythread, mythread_process_f process_func, mythread_work_f work_func, mythread_thread_opt_t opt)
84 {
85     mythread->sys_last_error = 0;
86 
87     if(mythread->entries_length >= mythread->entries_size)
88         return MyCORE_STATUS_THREAD_ERROR_NO_SLOTS;
89 
90     mythread_entry_t *entry = &mythread->entries[mythread->entries_length];
91 
92     entry->context.mythread = mythread;
93     entry->context.func     = work_func;
94     entry->context.id       = mythread->entries_length;
95     entry->context.t_count  = mythread->entries_size;
96     entry->context.opt      = opt;
97     entry->context.status   = 0;
98 
99     entry->context.timespec = mythread_nanosleep_create(mythread);
100 
101     entry->context.mutex = mythread_mutex_create(mythread);
102     if(entry->context.mutex == NULL)
103         return MyCORE_STATUS_THREAD_ERROR_MUTEX_INIT;
104 
105     if(mythread_mutex_wait(mythread, entry->context.mutex)) {
106         mythread_mutex_close(mythread, entry->context.mutex);
107         return MyCORE_STATUS_THREAD_ERROR_MUTEX_LOCK;
108     }
109 
110     entry->thread = mythread_thread_create(mythread, process_func, &entry->context);
111     if(entry->thread == NULL) {
112         mythread_mutex_close(mythread, entry->context.mutex);
113         return MyCORE_STATUS_ERROR;
114     }
115 
116     mythread->entries_length++;
117 
118     return MyCORE_STATUS_OK;
119 }
120 
myhread_increase_id_by_entry_id(mythread_t * mythread,mythread_id_t thread_id)121 mythread_id_t myhread_increase_id_by_entry_id(mythread_t* mythread, mythread_id_t thread_id)
122 {
123     return mythread->id_increase + thread_id;
124 }
125 
126 /*
127  * Global functions, for all threads
128  */
mythread_join(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx)129 mystatus_t mythread_join(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx)
130 {
131     for (size_t i = 0; i < mythread->entries_length; i++) {
132         if(before_join)
133             before_join(mythread, &mythread->entries[i], ctx);
134 
135         if(mythread_thread_join(mythread, mythread->entries[i].thread))
136             return MyCORE_STATUS_ERROR;
137     }
138 
139     return MyCORE_STATUS_OK;
140 }
141 
mythread_quit(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx)142 mystatus_t mythread_quit(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx)
143 {
144     mythread_option_set(mythread, MyTHREAD_OPT_QUIT);
145 
146     for (size_t i = 0; i < mythread->entries_length; i++)
147     {
148         if(before_join)
149             before_join(mythread, &mythread->entries[i], ctx);
150 
151         if(mythread_thread_join(mythread, mythread->entries[i].thread) ||
152            mythread_thread_destroy(mythread, mythread->entries[i].thread))
153         {
154             return MyCORE_STATUS_ERROR;
155         }
156     }
157 
158     return MyCORE_STATUS_OK;
159 }
160 
mythread_stop(mythread_t * mythread)161 mystatus_t mythread_stop(mythread_t *mythread)
162 {
163     if(mythread->opt & MyTHREAD_OPT_STOP)
164         return MyCORE_STATUS_OK;
165 
166     mythread_option_set(mythread, MyTHREAD_OPT_STOP);
167 
168     for (size_t i = 0; i < mythread->entries_length; i++)
169     {
170         while((mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) == 0) {
171             mythread_nanosleep_sleep(mythread->timespec);
172         }
173     }
174 
175     return MyCORE_STATUS_OK;
176 }
177 
mythread_suspend(mythread_t * mythread)178 mystatus_t mythread_suspend(mythread_t *mythread)
179 {
180     if(mythread->opt & MyTHREAD_OPT_WAIT)
181         return MyCORE_STATUS_OK;
182 
183     mythread_option_set(mythread, MyTHREAD_OPT_WAIT);
184 
185     for (size_t i = 0; i < mythread->entries_length; i++)
186     {
187         while((mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) == 0 &&
188               (mythread->entries[i].context.opt & MyTHREAD_OPT_WAIT) == 0)
189         {
190             mythread_nanosleep_sleep(mythread->timespec);
191         }
192     }
193 
194     return MyCORE_STATUS_OK;
195 }
196 
mythread_resume(mythread_t * mythread,mythread_thread_opt_t send_opt)197 mystatus_t mythread_resume(mythread_t *mythread, mythread_thread_opt_t send_opt)
198 {
199     if(mythread->opt & MyTHREAD_OPT_WAIT) {
200         mythread_option_set(mythread, send_opt);
201         return MyCORE_STATUS_OK;
202     }
203 
204     mythread_option_set(mythread, send_opt);
205 
206     for (size_t i = 0; i < mythread->entries_length; i++)
207     {
208         if(mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) {
209             mythread->entries[i].context.opt = send_opt;
210 
211             if(mythread_mutex_post(mythread, mythread->entries[i].context.mutex))
212                 return MyCORE_STATUS_ERROR;
213         }
214     }
215 
216     return MyCORE_STATUS_OK;
217 }
218 
mythread_check_status(mythread_t * mythread)219 mystatus_t mythread_check_status(mythread_t *mythread)
220 {
221     for (size_t i = 0; i < mythread->entries_length; i++)
222     {
223         if(mythread->entries[i].context.status)
224             return mythread->entries[i].context.status;
225     }
226 
227     return MyCORE_STATUS_OK;
228 }
229 
mythread_option(mythread_t * mythread)230 mythread_thread_opt_t mythread_option(mythread_t *mythread)
231 {
232     return mythread->opt;
233 }
234 
mythread_option_set(mythread_t * mythread,mythread_thread_opt_t opt)235 void mythread_option_set(mythread_t *mythread, mythread_thread_opt_t opt)
236 {
237     mythread->opt = opt;
238 }
239 
240 /*
241  * Entries functions, for all threads
242  */
mythread_entry_join(mythread_entry_t * entry,mythread_callback_before_entry_join_f before_join,void * ctx)243 mystatus_t mythread_entry_join(mythread_entry_t* entry, mythread_callback_before_entry_join_f before_join, void* ctx)
244 {
245     if(before_join)
246         before_join(entry->context.mythread, entry, ctx);
247 
248     if(mythread_thread_join(entry->context.mythread, entry->thread))
249         return MyCORE_STATUS_ERROR;
250 
251     return MyCORE_STATUS_OK;
252 }
253 
mythread_entry_quit(mythread_entry_t * entry,mythread_callback_before_entry_join_f before_join,void * ctx)254 mystatus_t mythread_entry_quit(mythread_entry_t* entry, mythread_callback_before_entry_join_f before_join, void* ctx)
255 {
256     if(before_join)
257         before_join(entry->context.mythread, entry, ctx);
258 
259     if(mythread_thread_join(entry->context.mythread, entry->thread) ||
260        mythread_thread_destroy(entry->context.mythread, entry->thread))
261     {
262         return MyCORE_STATUS_ERROR;
263     }
264 
265     return MyCORE_STATUS_OK;
266 }
267 
mythread_entry_stop(mythread_entry_t * entry)268 mystatus_t mythread_entry_stop(mythread_entry_t* entry)
269 {
270     if(entry->context.opt & MyTHREAD_OPT_STOP)
271         return MyCORE_STATUS_OK;
272 
273     entry->context.opt = MyTHREAD_OPT_STOP;
274 
275     while((entry->context.opt & MyTHREAD_OPT_STOP) == 0) {
276         mythread_nanosleep_sleep(entry->context.mythread->timespec);
277     }
278 
279     return MyCORE_STATUS_OK;
280 }
281 
mythread_entry_suspend(mythread_entry_t * entry)282 mystatus_t mythread_entry_suspend(mythread_entry_t* entry)
283 {
284     if(entry->context.opt & MyTHREAD_OPT_WAIT)
285         return MyCORE_STATUS_OK;
286 
287     entry->context.opt = MyTHREAD_OPT_WAIT;
288 
289     while((entry->context.opt & MyTHREAD_OPT_STOP) == 0 && (entry->context.opt & MyTHREAD_OPT_WAIT) == 0) {
290         mythread_nanosleep_sleep(entry->context.mythread->timespec);
291     }
292 
293     return MyCORE_STATUS_OK;
294 }
295 
mythread_entry_resume(mythread_entry_t * entry,mythread_thread_opt_t send_opt)296 mystatus_t mythread_entry_resume(mythread_entry_t* entry, mythread_thread_opt_t send_opt)
297 {
298     if(entry->context.opt & MyTHREAD_OPT_WAIT) {
299         entry->context.opt = send_opt;
300     }
301     else if(entry->context.opt & MyTHREAD_OPT_STOP) {
302         entry->context.opt = send_opt;
303 
304         if(mythread_mutex_post(entry->context.mythread, entry->context.mutex))
305             return MyCORE_STATUS_ERROR;
306     }
307     else
308         entry->context.opt = send_opt;
309 
310     return MyCORE_STATUS_OK;
311 }
312 
mythread_entry_status(mythread_entry_t * entry)313 mystatus_t mythread_entry_status(mythread_entry_t* entry)
314 {
315     return entry->context.status;
316 }
317 
mythread_entry_mythread(mythread_entry_t * entry)318 mythread_t * mythread_entry_mythread(mythread_entry_t* entry)
319 {
320     return entry->context.mythread;
321 }
322 
323 /* Callbacks */
mythread_callback_quit(mythread_t * mythread,mythread_entry_t * entry,void * ctx)324 void mythread_callback_quit(mythread_t* mythread, mythread_entry_t* entry, void* ctx)
325 {
326     while((entry->context.opt & MyTHREAD_OPT_QUIT) == 0)
327         mythread_nanosleep_sleep(mythread->timespec);
328 }
329 
330 #endif
331 
332