1 /*****************************************************************************
2 #                                                                            #
3 #    uStreamer - Lightweight and fast MJPG-HTTP streamer.                    #
4 #                                                                            #
5 #    Copyright (C) 2018-2021  Maxim Devaev <mdevaev@gmail.com>               #
6 #                                                                            #
7 #    This program is free software: you can redistribute it and/or modify    #
8 #    it under the terms of the GNU General Public License as published by    #
9 #    the Free Software Foundation, either version 3 of the License, or       #
10 #    (at your option) any later version.                                     #
11 #                                                                            #
12 #    This program is distributed in the hope that it will be useful,         #
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of          #
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the           #
15 #    GNU General Public License for more details.                            #
16 #                                                                            #
17 #    You should have received a copy of the GNU General Public License       #
18 #    along with this program.  If not, see <https://www.gnu.org/licenses/>.  #
19 #                                                                            #
20 *****************************************************************************/
21 
22 
23 #include "workers.h"
24 
25 
26 static void *_worker_thread(void *v_worker);
27 
28 
workers_pool_init(const char * name,const char * wr_prefix,unsigned n_workers,long double desired_interval,workers_pool_job_init_f job_init,void * job_init_arg,workers_pool_job_destroy_f job_destroy,workers_pool_run_job_f run_job)29 workers_pool_s *workers_pool_init(
30 	const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
31 	workers_pool_job_init_f job_init, void *job_init_arg,
32 	workers_pool_job_destroy_f job_destroy,
33 	workers_pool_run_job_f run_job) {
34 
35 	LOG_INFO("Creating pool %s with %u workers ...", name, n_workers);
36 
37 	workers_pool_s *pool;
38 	A_CALLOC(pool, 1);
39 	pool->name = name;
40 	pool->desired_interval = desired_interval;
41 	pool->job_destroy = job_destroy;
42 	pool->run_job = run_job;
43 
44 	atomic_init(&pool->stop, false);
45 
46 	pool->n_workers = n_workers;
47 	A_CALLOC(pool->workers, pool->n_workers);
48 
49 	A_MUTEX_INIT(&pool->free_workers_mutex);
50 	A_COND_INIT(&pool->free_workers_cond);
51 
52 	for (unsigned number = 0; number < pool->n_workers; ++number) {
53 #		define WR(_next) pool->workers[number]._next
54 
55 		WR(number) = number;
56 		A_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
57 
58 		A_MUTEX_INIT(&WR(has_job_mutex));
59 		atomic_init(&WR(has_job), false);
60 		A_COND_INIT(&WR(has_job_cond));
61 
62 		WR(pool) = pool;
63 		WR(job) = job_init(job_init_arg);
64 
65 		A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
66 		pool->free_workers += 1;
67 
68 #		undef WR
69 	}
70 	return pool;
71 }
72 
workers_pool_destroy(workers_pool_s * pool)73 void workers_pool_destroy(workers_pool_s *pool) {
74 	LOG_INFO("Destroying workers pool %s ...", pool->name);
75 
76 	atomic_store(&pool->stop, true);
77 	for (unsigned number = 0; number < pool->n_workers; ++number) {
78 #		define WR(_next) pool->workers[number]._next
79 
80 		A_MUTEX_LOCK(&WR(has_job_mutex));
81 		atomic_store(&WR(has_job), true); // Final job: die
82 		A_MUTEX_UNLOCK(&WR(has_job_mutex));
83 		A_COND_SIGNAL(&WR(has_job_cond));
84 
85 		A_THREAD_JOIN(WR(tid));
86 		A_MUTEX_DESTROY(&WR(has_job_mutex));
87 		A_COND_DESTROY(&WR(has_job_cond));
88 
89 		free(WR(name));
90 
91 		pool->job_destroy(WR(job));
92 
93 #		undef WR
94 	}
95 
96 	A_MUTEX_DESTROY(&pool->free_workers_mutex);
97 	A_COND_DESTROY(&pool->free_workers_cond);
98 
99 	free(pool->workers);
100 	free(pool);
101 }
102 
workers_pool_wait(workers_pool_s * pool)103 worker_s *workers_pool_wait(workers_pool_s *pool) {
104 	worker_s *ready_wr = NULL;
105 
106 	A_MUTEX_LOCK(&pool->free_workers_mutex);
107 	A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
108 	A_MUTEX_UNLOCK(&pool->free_workers_mutex);
109 
110 	if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
111 		ready_wr = pool->oldest_wr;
112 		ready_wr->job_timely = true;
113 		pool->oldest_wr = pool->oldest_wr->next_wr;
114 	} else {
115 		for (unsigned number = 0; number < pool->n_workers; ++number) {
116 			if (
117 				!atomic_load(&pool->workers[number].has_job) && (
118 					ready_wr == NULL
119 					|| ready_wr->job_start_ts < pool->workers[number].job_start_ts
120 				)
121 			) {
122 				ready_wr = &pool->workers[number];
123 				break;
124 			}
125 		}
126 		assert(ready_wr != NULL);
127 		ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
128 	}
129 	return ready_wr;
130 }
131 
workers_pool_assign(workers_pool_s * pool,worker_s * ready_wr)132 void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/) {
133 	if (pool->oldest_wr == NULL) {
134 		pool->oldest_wr = ready_wr;
135 		pool->latest_wr = pool->oldest_wr;
136 	} else {
137 		if (ready_wr->next_wr) {
138 			ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
139 		}
140 		if (ready_wr->prev_wr) {
141 			ready_wr->prev_wr->next_wr = ready_wr->next_wr;
142 		}
143 		ready_wr->prev_wr = pool->latest_wr;
144 		pool->latest_wr->next_wr = ready_wr;
145 		pool->latest_wr = ready_wr;
146 	}
147 	pool->latest_wr->next_wr = NULL;
148 
149 	A_MUTEX_LOCK(&ready_wr->has_job_mutex);
150 	//ready_wr->job = job;
151 	atomic_store(&ready_wr->has_job, true);
152 	A_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
153 	A_COND_SIGNAL(&ready_wr->has_job_cond);
154 
155 	A_MUTEX_LOCK(&pool->free_workers_mutex);
156 	pool->free_workers -= 1;
157 	A_MUTEX_UNLOCK(&pool->free_workers_mutex);
158 }
159 
workers_pool_get_fluency_delay(workers_pool_s * pool,worker_s * ready_wr)160 long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr) {
161 	const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
162 
163 	LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
164 		pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
165 
166 	pool->approx_job_time = approx_job_time;
167 
168 	const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
169 
170 	if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
171 		// Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
172 		// и аппаратный fps не попадает точно в желаемое значение
173 		return pool->desired_interval;
174 	}
175 	return min_delay;
176 }
177 
_worker_thread(void * v_worker)178 static void *_worker_thread(void *v_worker) {
179 	worker_s *wr = (worker_s *)v_worker;
180 
181 	A_THREAD_RENAME("%s", wr->name);
182 	LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
183 
184 	while (!atomic_load(&wr->pool->stop)) {
185 		LOG_DEBUG("Worker %s waiting for a new job ...", wr->name);
186 
187 		A_MUTEX_LOCK(&wr->has_job_mutex);
188 		A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
189 		A_MUTEX_UNLOCK(&wr->has_job_mutex);
190 
191 		if (!atomic_load(&wr->pool->stop)) {
192 			long double job_start_ts = get_now_monotonic();
193 			wr->job_failed = !wr->pool->run_job(wr);
194 			if (!wr->job_failed) {
195 				wr->job_start_ts = job_start_ts;
196 				wr->last_job_time = get_now_monotonic() - wr->job_start_ts;
197 			}
198 			//wr->job = NULL;
199 			atomic_store(&wr->has_job, false);
200 		}
201 
202 		A_MUTEX_LOCK(&wr->pool->free_workers_mutex);
203 		wr->pool->free_workers += 1;
204 		A_MUTEX_UNLOCK(&wr->pool->free_workers_mutex);
205 		A_COND_SIGNAL(&wr->pool->free_workers_cond);
206 	}
207 
208 	LOG_DEBUG("Bye-bye (worker %s)", wr->name);
209 	return NULL;
210 }
211