1 /*
2  * Claws Mail -- a GTK+ based, lightweight, and fast e-mail client
3  * Copyright (C) 2005-2012 DINH Viet Hoa and the Claws Mail team
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  *
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #include "claws-features.h"
23 #endif
24 
25 #ifdef HAVE_LIBETPAN
26 
27 #include "etpan-thread-manager.h"
28 
29 #include <glib.h>
30 #include <stdlib.h>
31 #include <pthread.h>
32 #include <libetpan/mailsem.h>
33 #include <semaphore.h>
34 #include <unistd.h>
35 #include <fcntl.h>
36 
37 #include "etpan-errors.h"
38 #include "utils.h"
39 
40 #define POOL_UNBOUND_MAX 4
41 
42 #define POOL_INIT_SIZE 8
43 #define OP_INIT_SIZE 8
44 
45 static int etpan_thread_start(struct etpan_thread * thread);
46 static void etpan_thread_free(struct etpan_thread * thread);
47 static unsigned int etpan_thread_get_load(struct etpan_thread * thread);
48 static int etpan_thread_is_bound(struct etpan_thread * thread);
49 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager);
50 static void etpan_thread_join(struct etpan_thread * thread);
51 static struct etpan_thread * etpan_thread_new(void);
52 static int etpan_thread_op_cancelled(struct etpan_thread_op * op);
53 static void etpan_thread_op_lock(struct etpan_thread_op * op);
54 static void etpan_thread_op_unlock(struct etpan_thread_op * op);
55 static void etpan_thread_stop(struct etpan_thread * thread);
56 
57 #if 0
58 static void etpan_thread_bind(struct etpan_thread * thread);
59 static int etpan_thread_manager_op_schedule(struct etpan_thread_manager * manager,
60 	     struct etpan_thread_op * op);
61 static void etpan_thread_manager_start(struct etpan_thread_manager * manager);
62 static void etpan_thread_op_cancel(struct etpan_thread_op * op);
63 #endif
64 
65 enum {
66   TERMINATE_STATE_NONE,
67   TERMINATE_STATE_REQUESTED,
68   TERMINATE_STATE_DONE,
69 };
70 
etpan_thread_manager_new(void)71 struct etpan_thread_manager * etpan_thread_manager_new(void)
72 {
73   struct etpan_thread_manager * manager;
74   int r;
75 
76   manager = malloc(sizeof(* manager));
77   if (manager == NULL)
78     goto err;
79 
80   manager->thread_pool = carray_new(POOL_INIT_SIZE);
81   if (manager->thread_pool == NULL)
82     goto free;
83 
84   manager->thread_pending = carray_new(POOL_INIT_SIZE);
85   if (manager->thread_pending == NULL)
86     goto free_pool;
87 
88   manager->can_create_thread = 1;
89   manager->unbound_count = 0;
90 
91   r = pipe(manager->notify_fds);
92   if (r < 0)
93     goto free_pending;
94 
95   return manager;
96 
97  free_pending:
98   carray_free(manager->thread_pending);
99  free_pool:
100   carray_free(manager->thread_pool);
101  free:
102   free(manager);
103  err:
104   return NULL;
105 }
106 
etpan_thread_manager_free(struct etpan_thread_manager * manager)107 void etpan_thread_manager_free(struct etpan_thread_manager * manager)
108 {
109   close(manager->notify_fds[1]);
110   close(manager->notify_fds[0]);
111   carray_free(manager->thread_pending);
112   carray_free(manager->thread_pool);
113   free(manager);
114 }
115 
etpan_thread_new(void)116 static struct etpan_thread * etpan_thread_new(void)
117 {
118   struct etpan_thread * thread;
119   int r;
120 
121   thread = malloc(sizeof(* thread));
122   if (thread == NULL)
123     goto err;
124 
125   r = pthread_mutex_init(&thread->lock, NULL);
126   if (r != 0)
127     goto free;
128 
129   thread->op_list = carray_new(OP_INIT_SIZE);
130   if (thread->op_list == NULL)
131     goto destroy_lock;
132 
133   thread->op_done_list = carray_new(OP_INIT_SIZE);
134   if (thread->op_done_list == NULL)
135     goto free_op_list;
136 
137   thread->start_sem = mailsem_new();
138   if (thread->start_sem == NULL)
139     goto free_op_done_list;
140 
141   thread->stop_sem = mailsem_new();
142   if (thread->stop_sem == NULL)
143     goto free_startsem;
144 
145   thread->op_sem = mailsem_new();
146   if (thread->op_sem == NULL)
147     goto free_stopsem;
148 
149   thread->manager = NULL;
150   thread->bound_count = 0;
151   thread->terminate_state = TERMINATE_STATE_NONE;
152 
153   return thread;
154 
155  free_stopsem:
156   mailsem_free(thread->stop_sem);
157  free_startsem:
158   mailsem_free(thread->start_sem);
159  free_op_done_list:
160   carray_free(thread->op_done_list);
161  free_op_list:
162   carray_free(thread->op_list);
163  destroy_lock:
164   pthread_mutex_destroy(&thread->lock);
165  free:
166   free(thread);
167  err:
168   return NULL;
169 }
170 
etpan_thread_free(struct etpan_thread * thread)171 static void etpan_thread_free(struct etpan_thread * thread)
172 {
173   mailsem_free(thread->op_sem);
174   mailsem_free(thread->stop_sem);
175   mailsem_free(thread->start_sem);
176   carray_free(thread->op_done_list);
177   carray_free(thread->op_list);
178   pthread_mutex_destroy(&thread->lock);
179   free(thread);
180 }
181 
etpan_thread_op_new(void)182 struct etpan_thread_op * etpan_thread_op_new(void)
183 {
184   struct etpan_thread_op * op;
185   int r;
186 
187   op = malloc(sizeof(* op));
188   if (op == NULL)
189     goto err;
190 
191   memset(op, 0, sizeof(* op));
192 
193   r = pthread_mutex_init(&op->lock, NULL);
194   if (r != 0)
195     goto free;
196 
197   return op;
198 
199  free:
200   free(op);
201  err:
202   return NULL;
203 }
204 
etpan_thread_op_free(struct etpan_thread_op * op)205 void etpan_thread_op_free(struct etpan_thread_op * op)
206 {
207   pthread_mutex_destroy(&op->lock);
208   free(op);
209 }
210 
211 static struct etpan_thread *
etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)212 etpan_thread_manager_create_thread(struct etpan_thread_manager * manager)
213 {
214   struct etpan_thread * thread;
215   int r;
216 
217   thread = etpan_thread_new();
218   if (thread == NULL)
219     goto err;
220 
221   thread->manager = manager;
222 
223   r = etpan_thread_start(thread);
224   if (r != NO_ERROR)
225     goto free_thread;
226 
227   r = carray_add(manager->thread_pool, thread, NULL);
228   if (r < 0) {
229     etpan_thread_stop(thread);
230     goto free_thread;
231   }
232 
233   return thread;
234 
235  free_thread:
236   etpan_thread_free(thread);
237  err:
238   return NULL;
239 }
240 
241 static void
etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,struct etpan_thread * thread)242 etpan_thread_manager_terminate_thread(struct etpan_thread_manager * manager,
243     struct etpan_thread * thread)
244 {
245   unsigned int i;
246   int r;
247 
248   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
249     if (carray_get(manager->thread_pool, i) == thread) {
250       carray_delete(manager->thread_pool, i);
251       break;
252     }
253   }
254 
255   if (!etpan_thread_is_bound(thread))
256     manager->unbound_count --;
257 
258   r = carray_add(manager->thread_pending, thread, NULL);
259   if (r < 0) {
260     g_warning("complete failure of thread due to lack of memory (thread stop)");
261   }
262 
263   etpan_thread_stop(thread);
264 }
265 
manager_notify(struct etpan_thread_manager * manager)266 static void manager_notify(struct etpan_thread_manager * manager)
267 {
268   char ch;
269   ssize_t r;
270 
271   ch = 1;
272   r = write(manager->notify_fds[1], &ch, 1);
273   if (r < 0) {
274     g_warning("error writing notification to etpan thread manager");
275   }
276 }
277 
manager_ack(struct etpan_thread_manager * manager)278 static void manager_ack(struct etpan_thread_manager * manager)
279 {
280 #ifndef G_OS_WIN32
281   char ch;
282   ssize_t r;
283   r = read(manager->notify_fds[0], &ch, 1);
284   if (r != 1) {
285     g_warning("error reading notification from etpan thread manager");
286   }
287 #else
288   /* done in the GIOChannel handler in imap-thread.c and nntp-thread.c */
289 #endif
290 }
291 
thread_lock(struct etpan_thread * thread)292 static void thread_lock(struct etpan_thread * thread)
293 {
294   pthread_mutex_lock(&thread->lock);
295 }
296 
thread_unlock(struct etpan_thread * thread)297 static void thread_unlock(struct etpan_thread * thread)
298 {
299   pthread_mutex_unlock(&thread->lock);
300 }
301 
thread_notify(struct etpan_thread * thread)302 static void thread_notify(struct etpan_thread * thread)
303 {
304   manager_notify(thread->manager);
305 }
306 
thread_run(void * data)307 static void * thread_run(void * data)
308 {
309   struct etpan_thread * thread;
310   int r;
311 
312   thread = data;
313 
314   mailsem_up(thread->start_sem);
315 
316   while (1) {
317     int do_quit;
318     struct etpan_thread_op * op;
319 
320     mailsem_down(thread->op_sem);
321 
322     do_quit = 0;
323     op = NULL;
324     thread_lock(thread);
325     if (carray_count(thread->op_list) > 0) {
326       op = carray_get(thread->op_list, 0);
327       carray_delete_slow(thread->op_list, 0);
328     }
329     else {
330       do_quit = 1;
331     }
332     thread_unlock(thread);
333 
334     if (do_quit) {
335       break;
336     }
337 
338     if (!etpan_thread_op_cancelled(op)) {
339       if (op->run != NULL)
340         op->run(op);
341     }
342 
343     thread_lock(thread);
344     r = carray_add(thread->op_done_list, op, NULL);
345     if (r < 0) {
346       g_warning("complete failure of thread due to lack of memory (op done)");
347     }
348     thread_unlock(thread);
349 
350     thread_notify(thread);
351   }
352 
353   thread_lock(thread);
354   thread->terminate_state = TERMINATE_STATE_DONE;
355   thread_unlock(thread);
356 
357   thread_notify(thread);
358 
359   mailsem_up(thread->stop_sem);
360 
361   return NULL;
362 }
363 
etpan_thread_start(struct etpan_thread * thread)364 static int etpan_thread_start(struct etpan_thread * thread)
365 {
366   int r;
367 
368   r = pthread_create(&thread->th_id, NULL, thread_run, thread);
369   if (r != 0)
370     return ERROR_MEMORY;
371 
372   mailsem_down(thread->start_sem);
373 
374   return NO_ERROR;
375 }
376 
etpan_thread_stop(struct etpan_thread * thread)377 static void etpan_thread_stop(struct etpan_thread * thread)
378 {
379   thread_lock(thread);
380   thread->terminate_state = TERMINATE_STATE_REQUESTED;
381   thread_unlock(thread);
382 
383   mailsem_up(thread->op_sem);
384 
385   /* this thread will be joined in the manager loop */
386 }
387 
etpan_thread_is_stopped(struct etpan_thread * thread)388 static int etpan_thread_is_stopped(struct etpan_thread * thread)
389 {
390   int stopped;
391 
392   thread_lock(thread);
393   stopped = (thread->terminate_state == TERMINATE_STATE_DONE);
394   thread_unlock(thread);
395 
396   return stopped;
397 }
398 
etpan_thread_join(struct etpan_thread * thread)399 static void etpan_thread_join(struct etpan_thread * thread)
400 {
401   mailsem_down(thread->stop_sem);
402   pthread_join(thread->th_id, NULL);
403 }
404 
405 struct etpan_thread *
etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)406 etpan_thread_manager_get_thread(struct etpan_thread_manager * manager)
407 {
408   struct etpan_thread * chosen_thread;
409   unsigned int chosen_thread_load;
410   unsigned int i;
411   struct etpan_thread * thread;
412 
413   /* chose a thread */
414 
415   chosen_thread = NULL;
416   chosen_thread_load = 0;
417 
418   for(i = 0 ; i < carray_count(manager->thread_pool) ; i ++) {
419     thread = carray_get(manager->thread_pool, i);
420     if (etpan_thread_is_bound(thread))
421       continue;
422 
423     if (chosen_thread == NULL) {
424       chosen_thread = thread;
425       chosen_thread_load = etpan_thread_get_load(thread);
426 
427       if (chosen_thread_load == 0)
428         break;
429     }
430     else {
431       unsigned int load;
432 
433       load = etpan_thread_get_load(thread);
434 
435       if (load < chosen_thread_load) {
436         chosen_thread = thread;
437         chosen_thread_load = load;
438       }
439     }
440   }
441 
442   if (chosen_thread != NULL) {
443     if (manager->can_create_thread && (chosen_thread_load != 0)) {
444       chosen_thread = NULL;
445     }
446   }
447 
448   /* choice done */
449 
450   if (chosen_thread != NULL)
451     return chosen_thread;
452 
453   thread = etpan_thread_manager_create_thread(manager);
454   if (thread == NULL)
455     goto err;
456 
457   manager->unbound_count ++;
458   if (manager->unbound_count >= POOL_UNBOUND_MAX)
459     manager->can_create_thread = 0;
460 
461   return thread;
462 
463  err:
464   return NULL;
465 }
466 
etpan_thread_get_load(struct etpan_thread * thread)467 static unsigned int etpan_thread_get_load(struct etpan_thread * thread)
468 {
469   unsigned int load;
470 
471   thread_lock(thread);
472   load = carray_count(thread->op_list);
473   thread_unlock(thread);
474 
475   return load;
476 }
477 
478 #if 0
479 static void etpan_thread_bind(struct etpan_thread * thread)
480 {
481   thread->bound_count ++;
482 }
483 #endif
484 
etpan_thread_unbind(struct etpan_thread * thread)485 void etpan_thread_unbind(struct etpan_thread * thread)
486 {
487   thread->bound_count --;
488 }
489 
etpan_thread_is_bound(struct etpan_thread * thread)490 static int etpan_thread_is_bound(struct etpan_thread * thread)
491 {
492   return (thread->bound_count != 0);
493 }
494 
etpan_thread_op_schedule(struct etpan_thread * thread,struct etpan_thread_op * op)495 int etpan_thread_op_schedule(struct etpan_thread * thread,
496                              struct etpan_thread_op * op)
497 {
498   int r;
499 
500   if (thread->terminate_state != TERMINATE_STATE_NONE)
501     return ERROR_INVAL;
502 
503   thread_lock(thread);
504   r = carray_add(thread->op_list, op, NULL);
505   thread_unlock(thread);
506 
507   if (r < 0)
508     return ERROR_MEMORY;
509 
510   op->thread = thread;
511   mailsem_up(thread->op_sem);
512 
513   return NO_ERROR;
514 }
515 
etpan_thread_op_lock(struct etpan_thread_op * op)516 static void etpan_thread_op_lock(struct etpan_thread_op * op)
517 {
518   pthread_mutex_lock(&op->lock);
519 }
520 
etpan_thread_op_unlock(struct etpan_thread_op * op)521 static void etpan_thread_op_unlock(struct etpan_thread_op * op)
522 {
523   pthread_mutex_unlock(&op->lock);
524 }
525 
etpan_thread_op_cancelled(struct etpan_thread_op * op)526 static int etpan_thread_op_cancelled(struct etpan_thread_op * op)
527 {
528   int cancelled;
529 
530   cancelled = 0;
531   etpan_thread_op_lock(op);
532   if (op->cancellable)
533     cancelled = op->cancelled;
534   etpan_thread_op_unlock(op);
535 
536   return cancelled;
537 }
538 
etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)539 int etpan_thread_manager_get_fd(struct etpan_thread_manager * manager)
540 {
541   return manager->notify_fds[0];
542 }
543 
loop_thread_list(carray * op_to_notify,carray * thread_list)544 static void loop_thread_list(carray * op_to_notify,
545     carray * thread_list)
546 {
547   unsigned int i;
548   int r;
549 
550   for(i = 0 ; i < carray_count(thread_list) ; i ++) {
551     struct etpan_thread * thread;
552     unsigned int j;
553 
554     thread = carray_get(thread_list, i);
555 
556     thread_lock(thread);
557 
558     for(j = 0 ; j < carray_count(thread->op_done_list) ; j ++) {
559       struct etpan_thread_op * op;
560 
561       op = carray_get(thread->op_done_list, j);
562       r = carray_add(op_to_notify, op, NULL);
563       if (r < 0) {
564         g_warning("complete failure of thread due to lack of memory (callback)");
565         break;
566       }
567     }
568     carray_set_size(thread->op_done_list, 0);
569 
570     thread_unlock(thread);
571   }
572 }
573 
etpan_thread_manager_loop(struct etpan_thread_manager * manager)574 void etpan_thread_manager_loop(struct etpan_thread_manager * manager)
575 {
576   carray * op_to_notify;
577   unsigned int i;
578 
579   manager_ack(manager);
580 
581   op_to_notify = carray_new(OP_INIT_SIZE);
582 
583   loop_thread_list(op_to_notify, manager->thread_pool);
584   loop_thread_list(op_to_notify, manager->thread_pending);
585 
586   for(i = 0 ; i < carray_count(op_to_notify) ; i ++) {
587     struct etpan_thread_op * op;
588 
589     op = carray_get(op_to_notify, i);
590 
591     etpan_thread_op_lock(op);
592 
593     if (!op->callback_called) {
594       if (op->callback != NULL)
595         op->callback(op->cancelled, op->result, op->callback_data);
596     }
597 
598     etpan_thread_op_unlock(op);
599 
600     if (op->cleanup != NULL)
601       op->cleanup(op);
602   }
603 
604   carray_free(op_to_notify);
605 
606   i = 0;
607   while (i < carray_count(manager->thread_pending)) {
608     struct etpan_thread * thread;
609 
610     thread = carray_get(manager->thread_pending, i);
611 
612     if (etpan_thread_is_stopped(thread)) {
613       etpan_thread_join(thread);
614 
615       etpan_thread_free(thread);
616 
617       carray_delete_slow(manager->thread_pending, i);
618     }
619     else {
620       i ++;
621     }
622   }
623 }
624 
625 #if 0
626 static void etpan_thread_manager_start(struct etpan_thread_manager * manager)
627 {
628   /* do nothing */
629 }
630 #endif
631 
etpan_thread_manager_stop(struct etpan_thread_manager * manager)632 void etpan_thread_manager_stop(struct etpan_thread_manager * manager)
633 {
634   while (carray_count(manager->thread_pool) > 0) {
635     struct etpan_thread * thread;
636 
637     thread = carray_get(manager->thread_pool, 0);
638     etpan_thread_manager_terminate_thread(manager, thread);
639   }
640 }
641 
etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)642 static int etpan_thread_manager_is_stopped(struct etpan_thread_manager * manager)
643 {
644   return ((carray_count(manager->thread_pending) == 0) &&
645       (carray_count(manager->thread_pool) == 0));
646 }
647 
etpan_thread_manager_join(struct etpan_thread_manager * manager)648 void etpan_thread_manager_join(struct etpan_thread_manager * manager)
649 {
650   while (!etpan_thread_manager_is_stopped(manager)) {
651     etpan_thread_manager_loop(manager);
652   }
653 }
654 #endif
655