1 /*
2 Copyright (C) 2015-2017 Alexander Borisov
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License as published by the Free Software Foundation; either
7 version 2.1 of the License, or (at your option) any later version.
8
9 This library is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 Lesser General Public License for more details.
13
14 You should have received a copy of the GNU Lesser General Public
15 License along with this library; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 Author: lex.borisov@gmail.com (Alexander Borisov)
19 */
20
21 #include "mycore/mythread.h"
22
23 #ifndef MyCORE_BUILD_WITHOUT_THREADS
24
mythread_create(void)25 mythread_t * mythread_create(void)
26 {
27 return mycore_calloc(1, sizeof(mythread_t));
28 }
29
mythread_init(mythread_t * mythread,mythread_type_t type,size_t threads_count,size_t id_increase)30 mystatus_t mythread_init(mythread_t *mythread, mythread_type_t type, size_t threads_count, size_t id_increase)
31 {
32 if(threads_count == 0)
33 return MyCORE_STATUS_ERROR;
34
35 mythread->entries_size = threads_count;
36 mythread->entries_length = 0;
37 mythread->id_increase = id_increase;
38 mythread->type = type;
39
40 mythread->entries = (mythread_entry_t*)mycore_calloc(mythread->entries_size, sizeof(mythread_entry_t));
41 if(mythread->entries == NULL)
42 return MyCORE_STATUS_ERROR_MEMORY_ALLOCATION;
43
44 mythread->attr = mythread_thread_attr_init(mythread);
45 if(mythread->attr == NULL)
46 return MyCORE_STATUS_THREAD_ERROR_ATTR_INIT;
47
48 mythread->timespec = mythread_nanosleep_create(mythread);
49
50 return MyCORE_STATUS_OK;
51 }
52
mythread_clean(mythread_t * mythread)53 void mythread_clean(mythread_t *mythread)
54 {
55 mythread_thread_attr_clean(mythread, mythread->attr);
56 mythread_nanosleep_clean(mythread->timespec);
57
58 mythread->sys_last_error = 0;
59 }
60
mythread_destroy(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx,bool self_destroy)61 mythread_t * mythread_destroy(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx, bool self_destroy)
62 {
63 if(mythread == NULL)
64 return NULL;
65
66 if(mythread->entries) {
67 mythread_resume(mythread, MyTHREAD_OPT_QUIT);
68 mythread_quit(mythread, before_join, ctx);
69 mycore_free(mythread->entries);
70 }
71
72 mythread_thread_attr_destroy(mythread, mythread->attr);
73 mythread_nanosleep_destroy(mythread->timespec);
74
75 if(self_destroy) {
76 mycore_free(mythread);
77 return NULL;
78 }
79
80 return mythread;
81 }
82
myhread_entry_create(mythread_t * mythread,mythread_process_f process_func,mythread_work_f work_func,mythread_thread_opt_t opt)83 mystatus_t myhread_entry_create(mythread_t *mythread, mythread_process_f process_func, mythread_work_f work_func, mythread_thread_opt_t opt)
84 {
85 mythread->sys_last_error = 0;
86
87 if(mythread->entries_length >= mythread->entries_size)
88 return MyCORE_STATUS_THREAD_ERROR_NO_SLOTS;
89
90 mythread_entry_t *entry = &mythread->entries[mythread->entries_length];
91
92 entry->context.mythread = mythread;
93 entry->context.func = work_func;
94 entry->context.id = mythread->entries_length;
95 entry->context.t_count = mythread->entries_size;
96 entry->context.opt = opt;
97 entry->context.status = 0;
98
99 entry->context.timespec = mythread_nanosleep_create(mythread);
100
101 entry->context.mutex = mythread_mutex_create(mythread);
102 if(entry->context.mutex == NULL)
103 return MyCORE_STATUS_THREAD_ERROR_MUTEX_INIT;
104
105 if(mythread_mutex_wait(mythread, entry->context.mutex)) {
106 mythread_mutex_close(mythread, entry->context.mutex);
107 return MyCORE_STATUS_THREAD_ERROR_MUTEX_LOCK;
108 }
109
110 entry->thread = mythread_thread_create(mythread, process_func, &entry->context);
111 if(entry->thread == NULL) {
112 mythread_mutex_close(mythread, entry->context.mutex);
113 return MyCORE_STATUS_ERROR;
114 }
115
116 mythread->entries_length++;
117
118 return MyCORE_STATUS_OK;
119 }
120
myhread_increase_id_by_entry_id(mythread_t * mythread,mythread_id_t thread_id)121 mythread_id_t myhread_increase_id_by_entry_id(mythread_t* mythread, mythread_id_t thread_id)
122 {
123 return mythread->id_increase + thread_id;
124 }
125
126 /*
127 * Global functions, for all threads
128 */
mythread_join(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx)129 mystatus_t mythread_join(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx)
130 {
131 for (size_t i = 0; i < mythread->entries_length; i++) {
132 if(before_join)
133 before_join(mythread, &mythread->entries[i], ctx);
134
135 if(mythread_thread_join(mythread, mythread->entries[i].thread))
136 return MyCORE_STATUS_ERROR;
137 }
138
139 return MyCORE_STATUS_OK;
140 }
141
mythread_quit(mythread_t * mythread,mythread_callback_before_entry_join_f before_join,void * ctx)142 mystatus_t mythread_quit(mythread_t *mythread, mythread_callback_before_entry_join_f before_join, void* ctx)
143 {
144 mythread_option_set(mythread, MyTHREAD_OPT_QUIT);
145
146 for (size_t i = 0; i < mythread->entries_length; i++)
147 {
148 if(before_join)
149 before_join(mythread, &mythread->entries[i], ctx);
150
151 if(mythread_thread_join(mythread, mythread->entries[i].thread) ||
152 mythread_thread_destroy(mythread, mythread->entries[i].thread))
153 {
154 return MyCORE_STATUS_ERROR;
155 }
156 }
157
158 return MyCORE_STATUS_OK;
159 }
160
mythread_stop(mythread_t * mythread)161 mystatus_t mythread_stop(mythread_t *mythread)
162 {
163 if(mythread->opt & MyTHREAD_OPT_STOP)
164 return MyCORE_STATUS_OK;
165
166 mythread_option_set(mythread, MyTHREAD_OPT_STOP);
167
168 for (size_t i = 0; i < mythread->entries_length; i++)
169 {
170 while((mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) == 0) {
171 mythread_nanosleep_sleep(mythread->timespec);
172 }
173 }
174
175 return MyCORE_STATUS_OK;
176 }
177
mythread_suspend(mythread_t * mythread)178 mystatus_t mythread_suspend(mythread_t *mythread)
179 {
180 if(mythread->opt & MyTHREAD_OPT_WAIT)
181 return MyCORE_STATUS_OK;
182
183 mythread_option_set(mythread, MyTHREAD_OPT_WAIT);
184
185 for (size_t i = 0; i < mythread->entries_length; i++)
186 {
187 while((mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) == 0 &&
188 (mythread->entries[i].context.opt & MyTHREAD_OPT_WAIT) == 0)
189 {
190 mythread_nanosleep_sleep(mythread->timespec);
191 }
192 }
193
194 return MyCORE_STATUS_OK;
195 }
196
mythread_resume(mythread_t * mythread,mythread_thread_opt_t send_opt)197 mystatus_t mythread_resume(mythread_t *mythread, mythread_thread_opt_t send_opt)
198 {
199 if(mythread->opt & MyTHREAD_OPT_WAIT) {
200 mythread_option_set(mythread, send_opt);
201 return MyCORE_STATUS_OK;
202 }
203
204 mythread_option_set(mythread, send_opt);
205
206 for (size_t i = 0; i < mythread->entries_length; i++)
207 {
208 if(mythread->entries[i].context.opt & MyTHREAD_OPT_STOP) {
209 mythread->entries[i].context.opt = send_opt;
210
211 if(mythread_mutex_post(mythread, mythread->entries[i].context.mutex))
212 return MyCORE_STATUS_ERROR;
213 }
214 }
215
216 return MyCORE_STATUS_OK;
217 }
218
mythread_check_status(mythread_t * mythread)219 mystatus_t mythread_check_status(mythread_t *mythread)
220 {
221 for (size_t i = 0; i < mythread->entries_length; i++)
222 {
223 if(mythread->entries[i].context.status)
224 return mythread->entries[i].context.status;
225 }
226
227 return MyCORE_STATUS_OK;
228 }
229
mythread_option(mythread_t * mythread)230 mythread_thread_opt_t mythread_option(mythread_t *mythread)
231 {
232 return mythread->opt;
233 }
234
mythread_option_set(mythread_t * mythread,mythread_thread_opt_t opt)235 void mythread_option_set(mythread_t *mythread, mythread_thread_opt_t opt)
236 {
237 mythread->opt = opt;
238 }
239
240 /*
241 * Entries functions, for all threads
242 */
mythread_entry_join(mythread_entry_t * entry,mythread_callback_before_entry_join_f before_join,void * ctx)243 mystatus_t mythread_entry_join(mythread_entry_t* entry, mythread_callback_before_entry_join_f before_join, void* ctx)
244 {
245 if(before_join)
246 before_join(entry->context.mythread, entry, ctx);
247
248 if(mythread_thread_join(entry->context.mythread, entry->thread))
249 return MyCORE_STATUS_ERROR;
250
251 return MyCORE_STATUS_OK;
252 }
253
mythread_entry_quit(mythread_entry_t * entry,mythread_callback_before_entry_join_f before_join,void * ctx)254 mystatus_t mythread_entry_quit(mythread_entry_t* entry, mythread_callback_before_entry_join_f before_join, void* ctx)
255 {
256 if(before_join)
257 before_join(entry->context.mythread, entry, ctx);
258
259 if(mythread_thread_join(entry->context.mythread, entry->thread) ||
260 mythread_thread_destroy(entry->context.mythread, entry->thread))
261 {
262 return MyCORE_STATUS_ERROR;
263 }
264
265 return MyCORE_STATUS_OK;
266 }
267
mythread_entry_stop(mythread_entry_t * entry)268 mystatus_t mythread_entry_stop(mythread_entry_t* entry)
269 {
270 if(entry->context.opt & MyTHREAD_OPT_STOP)
271 return MyCORE_STATUS_OK;
272
273 entry->context.opt = MyTHREAD_OPT_STOP;
274
275 while((entry->context.opt & MyTHREAD_OPT_STOP) == 0) {
276 mythread_nanosleep_sleep(entry->context.mythread->timespec);
277 }
278
279 return MyCORE_STATUS_OK;
280 }
281
mythread_entry_suspend(mythread_entry_t * entry)282 mystatus_t mythread_entry_suspend(mythread_entry_t* entry)
283 {
284 if(entry->context.opt & MyTHREAD_OPT_WAIT)
285 return MyCORE_STATUS_OK;
286
287 entry->context.opt = MyTHREAD_OPT_WAIT;
288
289 while((entry->context.opt & MyTHREAD_OPT_STOP) == 0 && (entry->context.opt & MyTHREAD_OPT_WAIT) == 0) {
290 mythread_nanosleep_sleep(entry->context.mythread->timespec);
291 }
292
293 return MyCORE_STATUS_OK;
294 }
295
mythread_entry_resume(mythread_entry_t * entry,mythread_thread_opt_t send_opt)296 mystatus_t mythread_entry_resume(mythread_entry_t* entry, mythread_thread_opt_t send_opt)
297 {
298 if(entry->context.opt & MyTHREAD_OPT_WAIT) {
299 entry->context.opt = send_opt;
300 }
301 else if(entry->context.opt & MyTHREAD_OPT_STOP) {
302 entry->context.opt = send_opt;
303
304 if(mythread_mutex_post(entry->context.mythread, entry->context.mutex))
305 return MyCORE_STATUS_ERROR;
306 }
307 else
308 entry->context.opt = send_opt;
309
310 return MyCORE_STATUS_OK;
311 }
312
mythread_entry_status(mythread_entry_t * entry)313 mystatus_t mythread_entry_status(mythread_entry_t* entry)
314 {
315 return entry->context.status;
316 }
317
mythread_entry_mythread(mythread_entry_t * entry)318 mythread_t * mythread_entry_mythread(mythread_entry_t* entry)
319 {
320 return entry->context.mythread;
321 }
322
323 /* Callbacks */
mythread_callback_quit(mythread_t * mythread,mythread_entry_t * entry,void * ctx)324 void mythread_callback_quit(mythread_t* mythread, mythread_entry_t* entry, void* ctx)
325 {
326 while((entry->context.opt & MyTHREAD_OPT_QUIT) == 0)
327 mythread_nanosleep_sleep(mythread->timespec);
328 }
329
330 #endif
331
332