1 /*
2  * Copyright 2008-2013 Various Authors
3  * Copyright 2004 Timo Hirvonen
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License as
7  * published by the Free Software Foundation; either version 2 of the
8  * License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "worker.h"
20 #include "locking.h"
21 #include "list.h"
22 #include "xmalloc.h"
23 #include "debug.h"
24 #include "job.h"
25 
26 #include <stdlib.h>
27 #include <stdint.h>
28 #include <pthread.h>
29 
30 struct worker_job {
31 	struct list_head node;
32 
33 	uint32_t type;
34 	void (*job_cb)(void *data);
35 	void (*free_cb)(void *data);
36 	void *data;
37 };
38 
39 enum worker_state {
40 	WORKER_PAUSED,
41 	WORKER_RUNNING,
42 	WORKER_STOPPED,
43 };
44 
45 static LIST_HEAD(worker_job_head);
46 static pthread_mutex_t worker_mutex = CMUS_MUTEX_INITIALIZER;
47 static pthread_cond_t worker_cond = PTHREAD_COND_INITIALIZER;
48 static pthread_t worker_thread;
49 static enum worker_state state = WORKER_PAUSED;
50 static int cancel_current = 0;
51 
52 /*
53  * - only worker thread modifies this
54  * - cur_job->job_cb can read this without locking
55  * - anyone else must lock worker before reading this
56  */
57 static struct worker_job *cur_job = NULL;
58 
59 #define worker_lock() cmus_mutex_lock(&worker_mutex)
60 #define worker_unlock() cmus_mutex_unlock(&worker_mutex)
61 
worker_loop(void * arg)62 static void *worker_loop(void *arg)
63 {
64 	srand(time(NULL));
65 
66 	worker_lock();
67 	while (1) {
68 		if (state != WORKER_RUNNING || list_empty(&worker_job_head)) {
69 			int rc;
70 
71 			if (state == WORKER_STOPPED)
72 				break;
73 
74 			rc = pthread_cond_wait(&worker_cond, &worker_mutex);
75 			if (rc)
76 				d_print("pthread_cond_wait: %s\n", strerror(rc));
77 		} else {
78 			struct list_head *item = worker_job_head.next;
79 			uint64_t t;
80 
81 			list_del(item);
82 			cur_job = container_of(item, struct worker_job, node);
83 			worker_unlock();
84 
85 			t = timer_get();
86 			cur_job->job_cb(cur_job->data);
87 			timer_print("worker job", timer_get() - t);
88 
89 			worker_lock();
90 			cur_job->free_cb(cur_job->data);
91 			free(cur_job);
92 			cur_job = NULL;
93 
94 			// wakeup worker_remove_jobs_*() if needed
95 			if (cancel_current) {
96 				cancel_current = 0;
97 				pthread_cond_signal(&worker_cond);
98 			}
99 		}
100 	}
101 	worker_unlock();
102 	return NULL;
103 }
104 
worker_init(void)105 void worker_init(void)
106 {
107 	int rc = pthread_create(&worker_thread, NULL, worker_loop, NULL);
108 
109 	BUG_ON(rc);
110 }
111 
worker_set_state(enum worker_state s)112 static void worker_set_state(enum worker_state s)
113 {
114 	worker_lock();
115 	state = s;
116 	pthread_cond_signal(&worker_cond);
117 	worker_unlock();
118 }
119 
worker_start(void)120 void worker_start(void)
121 {
122 	worker_set_state(WORKER_RUNNING);
123 }
124 
worker_exit(void)125 void worker_exit(void)
126 {
127 	worker_set_state(WORKER_STOPPED);
128 	pthread_join(worker_thread, NULL);
129 }
130 
worker_add_job(uint32_t type,void (* job_cb)(void * data),void (* free_cb)(void * data),void * data)131 void worker_add_job(uint32_t type, void (*job_cb)(void *data),
132 		void (*free_cb)(void *data), void *data)
133 {
134 	struct worker_job *job;
135 
136 	job = xnew(struct worker_job, 1);
137 	job->type = type;
138 	job->job_cb = job_cb;
139 	job->free_cb = free_cb;
140 	job->data = data;
141 
142 	worker_lock();
143 	list_add_tail(&job->node, &worker_job_head);
144 	pthread_cond_signal(&worker_cond);
145 	worker_unlock();
146 }
147 
worker_matches_type(uint32_t type,void * job_data,void * opaque)148 static int worker_matches_type(uint32_t type, void *job_data,
149 		void *opaque)
150 {
151 	uint32_t *pat = opaque;
152 	return !!(type & *pat);
153 }
154 
worker_remove_jobs_by_type(uint32_t pat)155 void worker_remove_jobs_by_type(uint32_t pat)
156 {
157 	worker_remove_jobs_by_cb(worker_matches_type, &pat);
158 }
159 
worker_remove_jobs_by_cb(worker_match_cb cb,void * opaque)160 void worker_remove_jobs_by_cb(worker_match_cb cb, void *opaque)
161 {
162 	struct list_head *item;
163 
164 	worker_lock();
165 
166 	item = worker_job_head.next;
167 	while (item != &worker_job_head) {
168 		struct worker_job *job = container_of(item, struct worker_job,
169 				node);
170 		struct list_head *next = item->next;
171 
172 		if (cb(job->type, job->data, opaque)) {
173 			list_del(&job->node);
174 			job->free_cb(job->data);
175 			free(job);
176 		}
177 		item = next;
178 	}
179 
180 	/* wait current job to finish or cancel if it's of the specified type */
181 	if (cur_job && cb(cur_job->type, cur_job->data, opaque)) {
182 		cancel_current = 1;
183 		while (cancel_current)
184 			pthread_cond_wait(&worker_cond, &worker_mutex);
185 	}
186 
187 	worker_unlock();
188 }
189 
worker_has_job_by_type(uint32_t pat)190 int worker_has_job_by_type(uint32_t pat)
191 {
192 	return worker_has_job_by_cb(worker_matches_type, &pat);
193 }
194 
worker_has_job_by_cb(worker_match_cb cb,void * opaque)195 int worker_has_job_by_cb(worker_match_cb cb, void *opaque)
196 {
197 	struct worker_job *job;
198 	int has_job = 0;
199 
200 	worker_lock();
201 	list_for_each_entry(job, &worker_job_head, node) {
202 		if (cb(job->type, job->data, opaque)) {
203 			has_job = 1;
204 			break;
205 		}
206 	}
207 	if (cur_job && cb(job->type, job->data, opaque))
208 		has_job = 1;
209 	worker_unlock();
210 	return has_job;
211 }
212 
213 /*
214  * this is only called from the worker thread
215  * cur_job is guaranteed to be non-NULL
216  */
worker_cancelling(void)217 int worker_cancelling(void)
218 {
219 	return cancel_current;
220 }
221