1 /*
2 * Copyright 2008-2013 Various Authors
3 * Copyright 2004 Timo Hirvonen
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License as
7 * published by the Free Software Foundation; either version 2 of the
8 * License, or (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "worker.h"
20 #include "locking.h"
21 #include "list.h"
22 #include "xmalloc.h"
23 #include "debug.h"
24 #include "job.h"
25
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <pthread.h>
29
30 struct worker_job {
31 struct list_head node;
32
33 uint32_t type;
34 void (*job_cb)(void *data);
35 void (*free_cb)(void *data);
36 void *data;
37 };
38
39 enum worker_state {
40 WORKER_PAUSED,
41 WORKER_RUNNING,
42 WORKER_STOPPED,
43 };
44
45 static LIST_HEAD(worker_job_head);
46 static pthread_mutex_t worker_mutex = CMUS_MUTEX_INITIALIZER;
47 static pthread_cond_t worker_cond = PTHREAD_COND_INITIALIZER;
48 static pthread_t worker_thread;
49 static enum worker_state state = WORKER_PAUSED;
50 static int cancel_current = 0;
51
52 /*
53 * - only worker thread modifies this
54 * - cur_job->job_cb can read this without locking
55 * - anyone else must lock worker before reading this
56 */
57 static struct worker_job *cur_job = NULL;
58
59 #define worker_lock() cmus_mutex_lock(&worker_mutex)
60 #define worker_unlock() cmus_mutex_unlock(&worker_mutex)
61
worker_loop(void * arg)62 static void *worker_loop(void *arg)
63 {
64 srand(time(NULL));
65
66 worker_lock();
67 while (1) {
68 if (state != WORKER_RUNNING || list_empty(&worker_job_head)) {
69 int rc;
70
71 if (state == WORKER_STOPPED)
72 break;
73
74 rc = pthread_cond_wait(&worker_cond, &worker_mutex);
75 if (rc)
76 d_print("pthread_cond_wait: %s\n", strerror(rc));
77 } else {
78 struct list_head *item = worker_job_head.next;
79 uint64_t t;
80
81 list_del(item);
82 cur_job = container_of(item, struct worker_job, node);
83 worker_unlock();
84
85 t = timer_get();
86 cur_job->job_cb(cur_job->data);
87 timer_print("worker job", timer_get() - t);
88
89 worker_lock();
90 cur_job->free_cb(cur_job->data);
91 free(cur_job);
92 cur_job = NULL;
93
94 // wakeup worker_remove_jobs_*() if needed
95 if (cancel_current) {
96 cancel_current = 0;
97 pthread_cond_signal(&worker_cond);
98 }
99 }
100 }
101 worker_unlock();
102 return NULL;
103 }
104
worker_init(void)105 void worker_init(void)
106 {
107 int rc = pthread_create(&worker_thread, NULL, worker_loop, NULL);
108
109 BUG_ON(rc);
110 }
111
worker_set_state(enum worker_state s)112 static void worker_set_state(enum worker_state s)
113 {
114 worker_lock();
115 state = s;
116 pthread_cond_signal(&worker_cond);
117 worker_unlock();
118 }
119
worker_start(void)120 void worker_start(void)
121 {
122 worker_set_state(WORKER_RUNNING);
123 }
124
worker_exit(void)125 void worker_exit(void)
126 {
127 worker_set_state(WORKER_STOPPED);
128 pthread_join(worker_thread, NULL);
129 }
130
worker_add_job(uint32_t type,void (* job_cb)(void * data),void (* free_cb)(void * data),void * data)131 void worker_add_job(uint32_t type, void (*job_cb)(void *data),
132 void (*free_cb)(void *data), void *data)
133 {
134 struct worker_job *job;
135
136 job = xnew(struct worker_job, 1);
137 job->type = type;
138 job->job_cb = job_cb;
139 job->free_cb = free_cb;
140 job->data = data;
141
142 worker_lock();
143 list_add_tail(&job->node, &worker_job_head);
144 pthread_cond_signal(&worker_cond);
145 worker_unlock();
146 }
147
worker_matches_type(uint32_t type,void * job_data,void * opaque)148 static int worker_matches_type(uint32_t type, void *job_data,
149 void *opaque)
150 {
151 uint32_t *pat = opaque;
152 return !!(type & *pat);
153 }
154
worker_remove_jobs_by_type(uint32_t pat)155 void worker_remove_jobs_by_type(uint32_t pat)
156 {
157 worker_remove_jobs_by_cb(worker_matches_type, &pat);
158 }
159
worker_remove_jobs_by_cb(worker_match_cb cb,void * opaque)160 void worker_remove_jobs_by_cb(worker_match_cb cb, void *opaque)
161 {
162 struct list_head *item;
163
164 worker_lock();
165
166 item = worker_job_head.next;
167 while (item != &worker_job_head) {
168 struct worker_job *job = container_of(item, struct worker_job,
169 node);
170 struct list_head *next = item->next;
171
172 if (cb(job->type, job->data, opaque)) {
173 list_del(&job->node);
174 job->free_cb(job->data);
175 free(job);
176 }
177 item = next;
178 }
179
180 /* wait current job to finish or cancel if it's of the specified type */
181 if (cur_job && cb(cur_job->type, cur_job->data, opaque)) {
182 cancel_current = 1;
183 while (cancel_current)
184 pthread_cond_wait(&worker_cond, &worker_mutex);
185 }
186
187 worker_unlock();
188 }
189
worker_has_job_by_type(uint32_t pat)190 int worker_has_job_by_type(uint32_t pat)
191 {
192 return worker_has_job_by_cb(worker_matches_type, &pat);
193 }
194
worker_has_job_by_cb(worker_match_cb cb,void * opaque)195 int worker_has_job_by_cb(worker_match_cb cb, void *opaque)
196 {
197 struct worker_job *job;
198 int has_job = 0;
199
200 worker_lock();
201 list_for_each_entry(job, &worker_job_head, node) {
202 if (cb(job->type, job->data, opaque)) {
203 has_job = 1;
204 break;
205 }
206 }
207 if (cur_job && cb(job->type, job->data, opaque))
208 has_job = 1;
209 worker_unlock();
210 return has_job;
211 }
212
213 /*
214 * this is only called from the worker thread
215 * cur_job is guaranteed to be non-NULL
216 */
worker_cancelling(void)217 int worker_cancelling(void)
218 {
219 return cancel_current;
220 }
221