1 /*****************************************************************************
2 # #
3 # uStreamer - Lightweight and fast MJPG-HTTP streamer. #
4 # #
5 # Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <https://www.gnu.org/licenses/>. #
19 # #
20 *****************************************************************************/
21
22
23 #include "workers.h"
24
25
26 static void *_worker_thread(void *v_worker);
27
28
workers_pool_init(const char * name,const char * wr_prefix,unsigned n_workers,long double desired_interval,workers_pool_job_init_f job_init,void * job_init_arg,workers_pool_job_destroy_f job_destroy,workers_pool_run_job_f run_job)29 workers_pool_s *workers_pool_init(
30 const char *name, const char *wr_prefix, unsigned n_workers, long double desired_interval,
31 workers_pool_job_init_f job_init, void *job_init_arg,
32 workers_pool_job_destroy_f job_destroy,
33 workers_pool_run_job_f run_job) {
34
35 LOG_INFO("Creating pool %s with %u workers ...", name, n_workers);
36
37 workers_pool_s *pool;
38 A_CALLOC(pool, 1);
39 pool->name = name;
40 pool->desired_interval = desired_interval;
41 pool->job_destroy = job_destroy;
42 pool->run_job = run_job;
43
44 atomic_init(&pool->stop, false);
45
46 pool->n_workers = n_workers;
47 A_CALLOC(pool->workers, pool->n_workers);
48
49 A_MUTEX_INIT(&pool->free_workers_mutex);
50 A_COND_INIT(&pool->free_workers_cond);
51
52 for (unsigned number = 0; number < pool->n_workers; ++number) {
53 # define WR(_next) pool->workers[number]._next
54
55 WR(number) = number;
56 A_ASPRINTF(WR(name), "%s-%u", wr_prefix, number);
57
58 A_MUTEX_INIT(&WR(has_job_mutex));
59 atomic_init(&WR(has_job), false);
60 A_COND_INIT(&WR(has_job_cond));
61
62 WR(pool) = pool;
63 WR(job) = job_init(job_init_arg);
64
65 A_THREAD_CREATE(&WR(tid), _worker_thread, (void *)&(pool->workers[number]));
66 pool->free_workers += 1;
67
68 # undef WR
69 }
70 return pool;
71 }
72
workers_pool_destroy(workers_pool_s * pool)73 void workers_pool_destroy(workers_pool_s *pool) {
74 LOG_INFO("Destroying workers pool %s ...", pool->name);
75
76 atomic_store(&pool->stop, true);
77 for (unsigned number = 0; number < pool->n_workers; ++number) {
78 # define WR(_next) pool->workers[number]._next
79
80 A_MUTEX_LOCK(&WR(has_job_mutex));
81 atomic_store(&WR(has_job), true); // Final job: die
82 A_MUTEX_UNLOCK(&WR(has_job_mutex));
83 A_COND_SIGNAL(&WR(has_job_cond));
84
85 A_THREAD_JOIN(WR(tid));
86 A_MUTEX_DESTROY(&WR(has_job_mutex));
87 A_COND_DESTROY(&WR(has_job_cond));
88
89 free(WR(name));
90
91 pool->job_destroy(WR(job));
92
93 # undef WR
94 }
95
96 A_MUTEX_DESTROY(&pool->free_workers_mutex);
97 A_COND_DESTROY(&pool->free_workers_cond);
98
99 free(pool->workers);
100 free(pool);
101 }
102
workers_pool_wait(workers_pool_s * pool)103 worker_s *workers_pool_wait(workers_pool_s *pool) {
104 worker_s *ready_wr = NULL;
105
106 A_MUTEX_LOCK(&pool->free_workers_mutex);
107 A_COND_WAIT_TRUE(pool->free_workers, &pool->free_workers_cond, &pool->free_workers_mutex);
108 A_MUTEX_UNLOCK(&pool->free_workers_mutex);
109
110 if (pool->oldest_wr && !atomic_load(&pool->oldest_wr->has_job)) {
111 ready_wr = pool->oldest_wr;
112 ready_wr->job_timely = true;
113 pool->oldest_wr = pool->oldest_wr->next_wr;
114 } else {
115 for (unsigned number = 0; number < pool->n_workers; ++number) {
116 if (
117 !atomic_load(&pool->workers[number].has_job) && (
118 ready_wr == NULL
119 || ready_wr->job_start_ts < pool->workers[number].job_start_ts
120 )
121 ) {
122 ready_wr = &pool->workers[number];
123 break;
124 }
125 }
126 assert(ready_wr != NULL);
127 ready_wr->job_timely = false; // Освободился воркер, получивший задание позже (или самый первый при самом первом захвате)
128 }
129 return ready_wr;
130 }
131
workers_pool_assign(workers_pool_s * pool,worker_s * ready_wr)132 void workers_pool_assign(workers_pool_s *pool, worker_s *ready_wr/*, void *job*/) {
133 if (pool->oldest_wr == NULL) {
134 pool->oldest_wr = ready_wr;
135 pool->latest_wr = pool->oldest_wr;
136 } else {
137 if (ready_wr->next_wr) {
138 ready_wr->next_wr->prev_wr = ready_wr->prev_wr;
139 }
140 if (ready_wr->prev_wr) {
141 ready_wr->prev_wr->next_wr = ready_wr->next_wr;
142 }
143 ready_wr->prev_wr = pool->latest_wr;
144 pool->latest_wr->next_wr = ready_wr;
145 pool->latest_wr = ready_wr;
146 }
147 pool->latest_wr->next_wr = NULL;
148
149 A_MUTEX_LOCK(&ready_wr->has_job_mutex);
150 //ready_wr->job = job;
151 atomic_store(&ready_wr->has_job, true);
152 A_MUTEX_UNLOCK(&ready_wr->has_job_mutex);
153 A_COND_SIGNAL(&ready_wr->has_job_cond);
154
155 A_MUTEX_LOCK(&pool->free_workers_mutex);
156 pool->free_workers -= 1;
157 A_MUTEX_UNLOCK(&pool->free_workers_mutex);
158 }
159
workers_pool_get_fluency_delay(workers_pool_s * pool,worker_s * ready_wr)160 long double workers_pool_get_fluency_delay(workers_pool_s *pool, worker_s *ready_wr) {
161 const long double approx_job_time = pool->approx_job_time * 0.9 + ready_wr->last_job_time * 0.1;
162
163 LOG_VERBOSE("Correcting pool's %s approx_job_time: %.3Lf -> %.3Lf (last_job_time=%.3Lf)",
164 pool->name, pool->approx_job_time, approx_job_time, ready_wr->last_job_time);
165
166 pool->approx_job_time = approx_job_time;
167
168 const long double min_delay = pool->approx_job_time / pool->n_workers; // Среднее время работы размазывается на N воркеров
169
170 if (pool->desired_interval > 0 && min_delay > 0 && pool->desired_interval > min_delay) {
171 // Искусственное время задержки на основе желаемого FPS, если включен --desired-fps
172 // и аппаратный fps не попадает точно в желаемое значение
173 return pool->desired_interval;
174 }
175 return min_delay;
176 }
177
_worker_thread(void * v_worker)178 static void *_worker_thread(void *v_worker) {
179 worker_s *wr = (worker_s *)v_worker;
180
181 A_THREAD_RENAME("%s", wr->name);
182 LOG_DEBUG("Hello! I am a worker %s ^_^", wr->name);
183
184 while (!atomic_load(&wr->pool->stop)) {
185 LOG_DEBUG("Worker %s waiting for a new job ...", wr->name);
186
187 A_MUTEX_LOCK(&wr->has_job_mutex);
188 A_COND_WAIT_TRUE(atomic_load(&wr->has_job), &wr->has_job_cond, &wr->has_job_mutex);
189 A_MUTEX_UNLOCK(&wr->has_job_mutex);
190
191 if (!atomic_load(&wr->pool->stop)) {
192 long double job_start_ts = get_now_monotonic();
193 wr->job_failed = !wr->pool->run_job(wr);
194 if (!wr->job_failed) {
195 wr->job_start_ts = job_start_ts;
196 wr->last_job_time = get_now_monotonic() - wr->job_start_ts;
197 }
198 //wr->job = NULL;
199 atomic_store(&wr->has_job, false);
200 }
201
202 A_MUTEX_LOCK(&wr->pool->free_workers_mutex);
203 wr->pool->free_workers += 1;
204 A_MUTEX_UNLOCK(&wr->pool->free_workers_mutex);
205 A_COND_SIGNAL(&wr->pool->free_workers_cond);
206 }
207
208 LOG_DEBUG("Bye-bye (worker %s)", wr->name);
209 return NULL;
210 }
211