1 /*
2 * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3 * Copyright (C) 2005-2012 DINH Viet Hoa and the Claws Mail team
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #include "claws-features.h"
23 #endif
24
25 #ifdef HAVE_LIBETPAN
26
27 #include "etpan-thread-manager.h"
28
29 #include <glib.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <libetpan/mailsem.h>
33 #include <semaphore.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36
37 #include "etpan-errors.h"
38 #include "utils.h"
39
40 #define POOL_UNBOUND_MAX 4
41
42 #define POOL_INIT_SIZE 8
43 #define OP_INIT_SIZE 8
44
45 static int etpan_thread_start(struct etpan_thread * thread);
46 static void etpan_thread_free(struct etpan_thread * thread);
47 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
48 static int etpan_thread_is_bound(struct etpan_thread * thread);
49 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
50 static void etpan_thread_join(struct etpan_thread * thread);
51 static struct etpan_thread * etpan_thread_new(void);
52 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
53 static void etpan_thread_op_lock(struct etpan_thread_op * op);
54 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
55 static void etpan_thread_stop(struct etpan_thread * thread);
56
57 #if 0
58 static void etpan_thread_bind(struct etpan_thread * thread);
59 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
60 struct etpan_thread_op * op);
61 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
62 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
63 #endif
64
65 enum {
66 TERMINATE_STATE_NONE,
67 TERMINATE_STATE_REQUESTED,
68 TERMINATE_STATE_DONE,
69 };
70
etpan_thread_manager_new(void)71 struct etpan_thread_manager * etpan_thread_manager_new(void)
72 {
73 struct etpan_thread_manager * manager;
74 int r;
75
76 manager = malloc(sizeof(* manager));
77 if (manager == NULL)
78 goto err;
79
80 manager->thread_pool = carray_new(POOL_INIT_SIZE);
81 if (manager->thread_pool == NULL)
82 goto free;
83
84 manager->thread_pending = carray_new(POOL_INIT_SIZE);
85 if (manager->thread_pending == NULL)
86 goto free_pool;
87
88 manager->can_create_thread = 1;
89 manager->unbound_count = 0;
90
91 r = pipe(manager->notify_fds);
92 if (r < 0)
93 goto free_pending;
94
95 return manager;
96
97 free_pending:
98 carray_free(manager->thread_pending);
99 free_pool:
100 carray_free(manager->thread_pool);
101 free:
102 free(manager);
103 err:
104 return NULL;
105 }
106
etpan_thread_manager_free(struct etpan_thread_manager * manager)107 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
108 {
109 close(manager->notify_fds[1]);
110 close(manager->notify_fds[0]);
111 carray_free(manager->thread_pending);
112 carray_free(manager->thread_pool);
113 free(manager);
114 }
115
etpan_thread_new(void)116 static struct etpan_thread * etpan_thread_new(void)
117 {
118 struct etpan_thread * thread;
119 int r;
120
121 thread = malloc(sizeof(* thread));
122 if (thread == NULL)
123 goto err;
124
125 r = pthread_mutex_init(&thread->lock, NULL);
126 if (r != 0)
127 goto free;
128
129 thread->op_list = carray_new(OP_INIT_SIZE);
130 if (thread->op_list == NULL)
131 goto destroy_lock;
132
133 thread->op_done_list = carray_new(OP_INIT_SIZE);
134 if (thread->op_done_list == NULL)
135 goto free_op_list;
136
137 thread->start_sem = mailsem_new();
138 if (thread->start_sem == NULL)
139 goto free_op_done_list;
140
141 thread->stop_sem = mailsem_new();
142 if (thread->stop_sem == NULL)
143 goto free_startsem;
144
145 thread->op_sem = mailsem_new();
146 if (thread->op_sem == NULL)
147 goto free_stopsem;
148
149 thread->manager = NULL;
150 thread->bound_count = 0;
151 thread->terminate_state = TERMINATE_STATE_NONE;
152
153 return thread;
154
155 free_stopsem:
156 mailsem_free(thread->stop_sem);
157 free_startsem:
158 mailsem_free(thread->start_sem);
159 free_op_done_list:
160 carray_free(thread->op_done_list);
161 free_op_list:
162 carray_free(thread->op_list);
163 destroy_lock:
164 pthread_mutex_destroy(&thread->lock);
165 free:
166 free(thread);
167 err:
168 return NULL;
169 }
170
etpan_thread_free(struct etpan_thread * thread)171 static void etpan_thread_free(struct etpan_thread * thread)
172 {
173 mailsem_free(thread->op_sem);
174 mailsem_free(thread->stop_sem);
175 mailsem_free(thread->start_sem);
176 carray_free(thread->op_done_list);
177 carray_free(thread->op_list);
178 pthread_mutex_destroy(&thread->lock);
179 free(thread);
180 }
181
etpan_thread_op_new(void)182 struct etpan_thread_op * etpan_thread_op_new(void)
183 {
184 struct etpan_thread_op * op;
185 int r;
186
187 op = malloc(sizeof(* op));
188 if (op == NULL)
189 goto err;
190
191 memset(op, 0, sizeof(* op));
192
193 r = pthread_mutex_init(&op->lock, NULL);
194 if (r != 0)
195 goto free;
196
197 return op;
198
199 free:
200 free(op);
201 err:
202 return NULL;
203 }
204
etpan_thread_op_free(struct etpan_thread_op * op)205 void etpan_thread_op_free(struct etpan_thread_op * op)
206 {
207 pthread_mutex_destroy(&op->lock);
208 free(op);
209 }
210
211 static struct etpan_thread *
etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)212 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
213 {
214 struct etpan_thread * thread;
215 int r;
216
217 thread = etpan_thread_new();
218 if (thread == NULL)
219 goto err;
220
221 thread->manager = manager;
222
223 r = etpan_thread_start(thread);
224 if (r != NO_ERROR)
225 goto free_thread;
226
227 r = carray_add(manager->thread_pool, thread, NULL);
228 if (r < 0) {
229 etpan_thread_stop(thread);
230 goto free_thread;
231 }
232
233 return thread;
234
235 free_thread:
236 etpan_thread_free(thread);
237 err:
238 return NULL;
239 }
240
241 static void
etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,struct etpan_thread * thread)242 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
243 struct etpan_thread * thread)
244 {
245 unsigned int i;
246 int r;
247
248 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
249 if (carray_get(manager->thread_pool, i) == thread) {
250 carray_delete(manager->thread_pool, i);
251 break;
252 }
253 }
254
255 if (!etpan_thread_is_bound(thread))
256 manager->unbound_count --;
257
258 r = carray_add(manager->thread_pending, thread, NULL);
259 if (r < 0) {
260 g_warning("complete failure of thread due to lack of memory (thread stop)");
261 }
262
263 etpan_thread_stop(thread);
264 }
265
manager_notify(struct etpan_thread_manager * manager)266 static void manager_notify(struct etpan_thread_manager * manager)
267 {
268 char ch;
269 ssize_t r;
270
271 ch = 1;
272 r = write(manager->notify_fds[1], &ch, 1);
273 if (r < 0) {
274 g_warning("error writing notification to etpan thread manager");
275 }
276 }
277
manager_ack(struct etpan_thread_manager * manager)278 static void manager_ack(struct etpan_thread_manager * manager)
279 {
280 #ifndef G_OS_WIN32
281 char ch;
282 ssize_t r;
283 r = read(manager->notify_fds[0], &ch, 1);
284 if (r != 1) {
285 g_warning("error reading notification from etpan thread manager");
286 }
287 #else
288 /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
289 #endif
290 }
291
thread_lock(struct etpan_thread * thread)292 static void thread_lock(struct etpan_thread * thread)
293 {
294 pthread_mutex_lock(&thread->lock);
295 }
296
thread_unlock(struct etpan_thread * thread)297 static void thread_unlock(struct etpan_thread * thread)
298 {
299 pthread_mutex_unlock(&thread->lock);
300 }
301
thread_notify(struct etpan_thread * thread)302 static void thread_notify(struct etpan_thread * thread)
303 {
304 manager_notify(thread->manager);
305 }
306
thread_run(void * data)307 static void * thread_run(void * data)
308 {
309 struct etpan_thread * thread;
310 int r;
311
312 thread = data;
313
314 mailsem_up(thread->start_sem);
315
316 while (1) {
317 int do_quit;
318 struct etpan_thread_op * op;
319
320 mailsem_down(thread->op_sem);
321
322 do_quit = 0;
323 op = NULL;
324 thread_lock(thread);
325 if (carray_count(thread->op_list) > 0) {
326 op = carray_get(thread->op_list, 0);
327 carray_delete_slow(thread->op_list, 0);
328 }
329 else {
330 do_quit = 1;
331 }
332 thread_unlock(thread);
333
334 if (do_quit) {
335 break;
336 }
337
338 if (!etpan_thread_op_cancelled(op)) {
339 if (op->run != NULL)
340 op->run(op);
341 }
342
343 thread_lock(thread);
344 r = carray_add(thread->op_done_list, op, NULL);
345 if (r < 0) {
346 g_warning("complete failure of thread due to lack of memory (op done)");
347 }
348 thread_unlock(thread);
349
350 thread_notify(thread);
351 }
352
353 thread_lock(thread);
354 thread->terminate_state = TERMINATE_STATE_DONE;
355 thread_unlock(thread);
356
357 thread_notify(thread);
358
359 mailsem_up(thread->stop_sem);
360
361 return NULL;
362 }
363
etpan_thread_start(struct etpan_thread * thread)364 static int etpan_thread_start(struct etpan_thread * thread)
365 {
366 int r;
367
368 r = pthread_create(&thread->th_id, NULL, thread_run, thread);
369 if (r != 0)
370 return ERROR_MEMORY;
371
372 mailsem_down(thread->start_sem);
373
374 return NO_ERROR;
375 }
376
etpan_thread_stop(struct etpan_thread * thread)377 static void etpan_thread_stop(struct etpan_thread * thread)
378 {
379 thread_lock(thread);
380 thread->terminate_state = TERMINATE_STATE_REQUESTED;
381 thread_unlock(thread);
382
383 mailsem_up(thread->op_sem);
384
385 /* this thread will be joined in the manager loop */
386 }
387
etpan_thread_is_stopped(struct etpan_thread * thread)388 static int etpan_thread_is_stopped(struct etpan_thread * thread)
389 {
390 int stopped;
391
392 thread_lock(thread);
393 stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
394 thread_unlock(thread);
395
396 return stopped;
397 }
398
etpan_thread_join(struct etpan_thread * thread)399 static void etpan_thread_join(struct etpan_thread * thread)
400 {
401 mailsem_down(thread->stop_sem);
402 pthread_join(thread->th_id, NULL);
403 }
404
405 struct etpan_thread *
etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)406 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
407 {
408 struct etpan_thread * chosen_thread;
409 unsigned int chosen_thread_load;
410 unsigned int i;
411 struct etpan_thread * thread;
412
413 /* chose a thread */
414
415 chosen_thread = NULL;
416 chosen_thread_load = 0;
417
418 for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
419 thread = carray_get(manager->thread_pool, i);
420 if (etpan_thread_is_bound(thread))
421 continue;
422
423 if (chosen_thread == NULL) {
424 chosen_thread = thread;
425 chosen_thread_load = etpan_thread_get_load(thread);
426
427 if (chosen_thread_load == 0)
428 break;
429 }
430 else {
431 unsigned int load;
432
433 load = etpan_thread_get_load(thread);
434
435 if (load < chosen_thread_load) {
436 chosen_thread = thread;
437 chosen_thread_load = load;
438 }
439 }
440 }
441
442 if (chosen_thread != NULL) {
443 if (manager->can_create_thread && (chosen_thread_load != 0)) {
444 chosen_thread = NULL;
445 }
446 }
447
448 /* choice done */
449
450 if (chosen_thread != NULL)
451 return chosen_thread;
452
453 thread = etpan_thread_manager_create_thread(manager);
454 if (thread == NULL)
455 goto err;
456
457 manager->unbound_count ++;
458 if (manager->unbound_count >= POOL_UNBOUND_MAX)
459 manager->can_create_thread = 0;
460
461 return thread;
462
463 err:
464 return NULL;
465 }
466
etpan_thread_get_load(struct etpan_thread * thread)467 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
468 {
469 unsigned int load;
470
471 thread_lock(thread);
472 load = carray_count(thread->op_list);
473 thread_unlock(thread);
474
475 return load;
476 }
477
478 #if 0
479 static void etpan_thread_bind(struct etpan_thread * thread)
480 {
481 thread->bound_count ++;
482 }
483 #endif
484
etpan_thread_unbind(struct etpan_thread * thread)485 void etpan_thread_unbind(struct etpan_thread * thread)
486 {
487 thread->bound_count --;
488 }
489
etpan_thread_is_bound(struct etpan_thread * thread)490 static int etpan_thread_is_bound(struct etpan_thread * thread)
491 {
492 return (thread->bound_count != 0);
493 }
494
etpan_thread_op_schedule(struct etpan_thread * thread,struct etpan_thread_op * op)495 int etpan_thread_op_schedule(struct etpan_thread * thread,
496 struct etpan_thread_op * op)
497 {
498 int r;
499
500 if (thread->terminate_state != TERMINATE_STATE_NONE)
501 return ERROR_INVAL;
502
503 thread_lock(thread);
504 r = carray_add(thread->op_list, op, NULL);
505 thread_unlock(thread);
506
507 if (r < 0)
508 return ERROR_MEMORY;
509
510 op->thread = thread;
511 mailsem_up(thread->op_sem);
512
513 return NO_ERROR;
514 }
515
etpan_thread_op_lock(struct etpan_thread_op * op)516 static void etpan_thread_op_lock(struct etpan_thread_op * op)
517 {
518 pthread_mutex_lock(&op->lock);
519 }
520
etpan_thread_op_unlock(struct etpan_thread_op * op)521 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
522 {
523 pthread_mutex_unlock(&op->lock);
524 }
525
etpan_thread_op_cancelled(struct etpan_thread_op * op)526 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
527 {
528 int cancelled;
529
530 cancelled = 0;
531 etpan_thread_op_lock(op);
532 if (op->cancellable)
533 cancelled = op->cancelled;
534 etpan_thread_op_unlock(op);
535
536 return cancelled;
537 }
538
etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)539 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
540 {
541 return manager->notify_fds[0];
542 }
543
loop_thread_list(carray * op_to_notify,carray * thread_list)544 static void loop_thread_list(carray * op_to_notify,
545 carray * thread_list)
546 {
547 unsigned int i;
548 int r;
549
550 for(i = 0 ; i < carray_count(thread_list) ; i ++) {
551 struct etpan_thread * thread;
552 unsigned int j;
553
554 thread = carray_get(thread_list, i);
555
556 thread_lock(thread);
557
558 for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
559 struct etpan_thread_op * op;
560
561 op = carray_get(thread->op_done_list, j);
562 r = carray_add(op_to_notify, op, NULL);
563 if (r < 0) {
564 g_warning("complete failure of thread due to lack of memory (callback)");
565 break;
566 }
567 }
568 carray_set_size(thread->op_done_list, 0);
569
570 thread_unlock(thread);
571 }
572 }
573
etpan_thread_manager_loop(struct etpan_thread_manager * manager)574 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
575 {
576 carray * op_to_notify;
577 unsigned int i;
578
579 manager_ack(manager);
580
581 op_to_notify = carray_new(OP_INIT_SIZE);
582
583 loop_thread_list(op_to_notify, manager->thread_pool);
584 loop_thread_list(op_to_notify, manager->thread_pending);
585
586 for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
587 struct etpan_thread_op * op;
588
589 op = carray_get(op_to_notify, i);
590
591 etpan_thread_op_lock(op);
592
593 if (!op->callback_called) {
594 if (op->callback != NULL)
595 op->callback(op->cancelled, op->result, op->callback_data);
596 }
597
598 etpan_thread_op_unlock(op);
599
600 if (op->cleanup != NULL)
601 op->cleanup(op);
602 }
603
604 carray_free(op_to_notify);
605
606 i = 0;
607 while (i < carray_count(manager->thread_pending)) {
608 struct etpan_thread * thread;
609
610 thread = carray_get(manager->thread_pending, i);
611
612 if (etpan_thread_is_stopped(thread)) {
613 etpan_thread_join(thread);
614
615 etpan_thread_free(thread);
616
617 carray_delete_slow(manager->thread_pending, i);
618 }
619 else {
620 i ++;
621 }
622 }
623 }
624
625 #if 0
626 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
627 {
628 /* do nothing */
629 }
630 #endif
631
etpan_thread_manager_stop(struct etpan_thread_manager * manager)632 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
633 {
634 while (carray_count(manager->thread_pool) > 0) {
635 struct etpan_thread * thread;
636
637 thread = carray_get(manager->thread_pool, 0);
638 etpan_thread_manager_terminate_thread(manager, thread);
639 }
640 }
641
etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)642 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
643 {
644 return ((carray_count(manager->thread_pending) == 0) &&
645 (carray_count(manager->thread_pool) == 0));
646 }
647
etpan_thread_manager_join(struct etpan_thread_manager * manager)648 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
649 {
650 while (!etpan_thread_manager_is_stopped(manager)) {
651 etpan_thread_manager_loop(manager);
652 }
653 }
654 #endif
655