1 /*
2  *  Copyright (c) 2017 The WebM project authors. All Rights Reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include <assert.h>
12 
13 #include "vp9/encoder/vp9_encoder.h"
14 #include "vp9/encoder/vp9_ethread.h"
15 #include "vp9/encoder/vp9_multi_thread.h"
16 #include "vp9/encoder/vp9_temporal_filter.h"
17 
vp9_enc_grp_get_next_job(MultiThreadHandle * multi_thread_ctxt,int tile_id)18 void *vp9_enc_grp_get_next_job(MultiThreadHandle *multi_thread_ctxt,
19                                int tile_id) {
20   RowMTInfo *row_mt_info;
21   JobQueueHandle *job_queue_hdl = NULL;
22   void *next = NULL;
23   JobNode *job_info = NULL;
24 #if CONFIG_MULTITHREAD
25   pthread_mutex_t *mutex_handle = NULL;
26 #endif
27 
28   row_mt_info = (RowMTInfo *)(&multi_thread_ctxt->row_mt_info[tile_id]);
29   job_queue_hdl = (JobQueueHandle *)&row_mt_info->job_queue_hdl;
30 #if CONFIG_MULTITHREAD
31   mutex_handle = &row_mt_info->job_mutex;
32 #endif
33 
34 // lock the mutex for queue access
35 #if CONFIG_MULTITHREAD
36   pthread_mutex_lock(mutex_handle);
37 #endif
38   next = job_queue_hdl->next;
39   if (NULL != next) {
40     JobQueue *job_queue = (JobQueue *)next;
41     job_info = &job_queue->job_info;
42     // Update the next job in the queue
43     job_queue_hdl->next = job_queue->next;
44     job_queue_hdl->num_jobs_acquired++;
45   }
46 
47 #if CONFIG_MULTITHREAD
48   pthread_mutex_unlock(mutex_handle);
49 #endif
50 
51   return job_info;
52 }
53 
vp9_row_mt_alloc_rd_thresh(VP9_COMP * const cpi,TileDataEnc * const this_tile)54 void vp9_row_mt_alloc_rd_thresh(VP9_COMP *const cpi,
55                                 TileDataEnc *const this_tile) {
56   VP9_COMMON *const cm = &cpi->common;
57   const int sb_rows =
58       (mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2) + 1;
59   int i;
60 
61   this_tile->row_base_thresh_freq_fact =
62       (int *)vpx_calloc(sb_rows * BLOCK_SIZES * MAX_MODES,
63                         sizeof(*(this_tile->row_base_thresh_freq_fact)));
64   for (i = 0; i < sb_rows * BLOCK_SIZES * MAX_MODES; i++)
65     this_tile->row_base_thresh_freq_fact[i] = RD_THRESH_INIT_FACT;
66 }
67 
vp9_row_mt_mem_alloc(VP9_COMP * cpi)68 void vp9_row_mt_mem_alloc(VP9_COMP *cpi) {
69   struct VP9Common *cm = &cpi->common;
70   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
71   int tile_row, tile_col;
72   const int tile_cols = 1 << cm->log2_tile_cols;
73   const int tile_rows = 1 << cm->log2_tile_rows;
74   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
75   int jobs_per_tile_col, total_jobs;
76 
77   // Allocate memory that is large enough for all row_mt stages. First pass
78   // uses 16x16 block size.
79   jobs_per_tile_col = VPXMAX(cm->mb_rows, sb_rows);
80   // Calculate the total number of jobs
81   total_jobs = jobs_per_tile_col * tile_cols;
82 
83   multi_thread_ctxt->allocated_tile_cols = tile_cols;
84   multi_thread_ctxt->allocated_tile_rows = tile_rows;
85   multi_thread_ctxt->allocated_vert_unit_rows = jobs_per_tile_col;
86 
87   multi_thread_ctxt->job_queue =
88       (JobQueue *)vpx_memalign(32, total_jobs * sizeof(JobQueue));
89 
90 #if CONFIG_MULTITHREAD
91   // Create mutex for each tile
92   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
93     RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
94     pthread_mutex_init(&row_mt_info->job_mutex, NULL);
95   }
96 #endif
97 
98   // Allocate memory for row based multi-threading
99   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
100     TileDataEnc *this_tile = &cpi->tile_data[tile_col];
101     vp9_row_mt_sync_mem_alloc(&this_tile->row_mt_sync, cm, jobs_per_tile_col);
102     if (cpi->sf.adaptive_rd_thresh_row_mt) {
103       if (this_tile->row_base_thresh_freq_fact != NULL) {
104         vpx_free(this_tile->row_base_thresh_freq_fact);
105         this_tile->row_base_thresh_freq_fact = NULL;
106       }
107       vp9_row_mt_alloc_rd_thresh(cpi, this_tile);
108     }
109   }
110 
111   // Assign the sync pointer of tile row zero for every tile row > 0
112   for (tile_row = 1; tile_row < tile_rows; tile_row++) {
113     for (tile_col = 0; tile_col < tile_cols; tile_col++) {
114       TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols + tile_col];
115       TileDataEnc *this_col_tile = &cpi->tile_data[tile_col];
116       this_tile->row_mt_sync = this_col_tile->row_mt_sync;
117     }
118   }
119 
120   // Calculate the number of vertical units in the given tile row
121   for (tile_row = 0; tile_row < tile_rows; tile_row++) {
122     TileDataEnc *this_tile = &cpi->tile_data[tile_row * tile_cols];
123     TileInfo *tile_info = &this_tile->tile_info;
124     multi_thread_ctxt->num_tile_vert_sbs[tile_row] =
125         get_num_vert_units(*tile_info, MI_BLOCK_SIZE_LOG2);
126   }
127 }
128 
vp9_row_mt_mem_dealloc(VP9_COMP * cpi)129 void vp9_row_mt_mem_dealloc(VP9_COMP *cpi) {
130   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
131   int tile_col;
132 #if CONFIG_MULTITHREAD
133   int tile_row;
134 #endif
135 
136   // Deallocate memory for job queue
137   if (multi_thread_ctxt->job_queue) vpx_free(multi_thread_ctxt->job_queue);
138 
139 #if CONFIG_MULTITHREAD
140   // Destroy mutex for each tile
141   for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
142        tile_col++) {
143     RowMTInfo *row_mt_info = &multi_thread_ctxt->row_mt_info[tile_col];
144     if (row_mt_info) pthread_mutex_destroy(&row_mt_info->job_mutex);
145   }
146 #endif
147 
148   // Free row based multi-threading sync memory
149   for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
150        tile_col++) {
151     TileDataEnc *this_tile = &cpi->tile_data[tile_col];
152     vp9_row_mt_sync_mem_dealloc(&this_tile->row_mt_sync);
153   }
154 
155 #if CONFIG_MULTITHREAD
156   for (tile_row = 0; tile_row < multi_thread_ctxt->allocated_tile_rows;
157        tile_row++) {
158     for (tile_col = 0; tile_col < multi_thread_ctxt->allocated_tile_cols;
159          tile_col++) {
160       TileDataEnc *this_tile =
161           &cpi->tile_data[tile_row * multi_thread_ctxt->allocated_tile_cols +
162                           tile_col];
163       if (this_tile->row_base_thresh_freq_fact != NULL) {
164         vpx_free(this_tile->row_base_thresh_freq_fact);
165         this_tile->row_base_thresh_freq_fact = NULL;
166       }
167     }
168   }
169 #endif
170 }
171 
vp9_multi_thread_tile_init(VP9_COMP * cpi)172 void vp9_multi_thread_tile_init(VP9_COMP *cpi) {
173   VP9_COMMON *const cm = &cpi->common;
174   const int tile_cols = 1 << cm->log2_tile_cols;
175   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
176   int i;
177 
178   for (i = 0; i < tile_cols; i++) {
179     TileDataEnc *this_tile = &cpi->tile_data[i];
180     int jobs_per_tile_col = cpi->oxcf.pass == 1 ? cm->mb_rows : sb_rows;
181 
182     // Initialize cur_col to -1 for all rows.
183     memset(this_tile->row_mt_sync.cur_col, -1,
184            sizeof(*this_tile->row_mt_sync.cur_col) * jobs_per_tile_col);
185     vp9_zero(this_tile->fp_data);
186     this_tile->fp_data.image_data_start_row = INVALID_ROW;
187   }
188 }
189 
vp9_assign_tile_to_thread(MultiThreadHandle * multi_thread_ctxt,int tile_cols,int num_workers)190 void vp9_assign_tile_to_thread(MultiThreadHandle *multi_thread_ctxt,
191                                int tile_cols, int num_workers) {
192   int tile_id = 0;
193   int i;
194 
195   // Allocating the threads for the tiles
196   for (i = 0; i < num_workers; i++) {
197     multi_thread_ctxt->thread_id_to_tile_id[i] = tile_id++;
198     if (tile_id == tile_cols) tile_id = 0;
199   }
200 }
201 
vp9_get_job_queue_status(MultiThreadHandle * multi_thread_ctxt,int cur_tile_id)202 int vp9_get_job_queue_status(MultiThreadHandle *multi_thread_ctxt,
203                              int cur_tile_id) {
204   RowMTInfo *row_mt_info;
205   JobQueueHandle *job_queue_hndl;
206 #if CONFIG_MULTITHREAD
207   pthread_mutex_t *mutex;
208 #endif
209   int num_jobs_remaining;
210 
211   row_mt_info = &multi_thread_ctxt->row_mt_info[cur_tile_id];
212   job_queue_hndl = &row_mt_info->job_queue_hdl;
213 #if CONFIG_MULTITHREAD
214   mutex = &row_mt_info->job_mutex;
215 #endif
216 
217 #if CONFIG_MULTITHREAD
218   pthread_mutex_lock(mutex);
219 #endif
220   num_jobs_remaining =
221       multi_thread_ctxt->jobs_per_tile_col - job_queue_hndl->num_jobs_acquired;
222 #if CONFIG_MULTITHREAD
223   pthread_mutex_unlock(mutex);
224 #endif
225 
226   return (num_jobs_remaining);
227 }
228 
vp9_prepare_job_queue(VP9_COMP * cpi,JOB_TYPE job_type)229 void vp9_prepare_job_queue(VP9_COMP *cpi, JOB_TYPE job_type) {
230   VP9_COMMON *const cm = &cpi->common;
231   MultiThreadHandle *multi_thread_ctxt = &cpi->multi_thread_ctxt;
232   JobQueue *job_queue = multi_thread_ctxt->job_queue;
233   const int tile_cols = 1 << cm->log2_tile_cols;
234   int job_row_num, jobs_per_tile, jobs_per_tile_col = 0, total_jobs;
235   const int sb_rows = mi_cols_aligned_to_sb(cm->mi_rows) >> MI_BLOCK_SIZE_LOG2;
236   int tile_col, i;
237 
238   switch (job_type) {
239     case ENCODE_JOB: jobs_per_tile_col = sb_rows; break;
240     case FIRST_PASS_JOB: jobs_per_tile_col = cm->mb_rows; break;
241     case ARNR_JOB:
242       jobs_per_tile_col = ((cm->mi_rows + TF_ROUND) >> TF_SHIFT);
243       break;
244     default: assert(0);
245   }
246 
247   total_jobs = jobs_per_tile_col * tile_cols;
248 
249   multi_thread_ctxt->jobs_per_tile_col = jobs_per_tile_col;
250   // memset the entire job queue buffer to zero
251   memset(job_queue, 0, total_jobs * sizeof(JobQueue));
252 
253   // Job queue preparation
254   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
255     RowMTInfo *tile_ctxt = &multi_thread_ctxt->row_mt_info[tile_col];
256     JobQueue *job_queue_curr, *job_queue_temp;
257     int tile_row = 0;
258 
259     tile_ctxt->job_queue_hdl.next = (void *)job_queue;
260     tile_ctxt->job_queue_hdl.num_jobs_acquired = 0;
261 
262     job_queue_curr = job_queue;
263     job_queue_temp = job_queue;
264 
265     // loop over all the vertical rows
266     for (job_row_num = 0, jobs_per_tile = 0; job_row_num < jobs_per_tile_col;
267          job_row_num++, jobs_per_tile++) {
268       job_queue_curr->job_info.vert_unit_row_num = job_row_num;
269       job_queue_curr->job_info.tile_col_id = tile_col;
270       job_queue_curr->job_info.tile_row_id = tile_row;
271       job_queue_curr->next = (void *)(job_queue_temp + 1);
272       job_queue_curr = ++job_queue_temp;
273 
274       if (ENCODE_JOB == job_type) {
275         if (jobs_per_tile >=
276             multi_thread_ctxt->num_tile_vert_sbs[tile_row] - 1) {
277           tile_row++;
278           jobs_per_tile = -1;
279         }
280       }
281     }
282 
283     // Set the last pointer to NULL
284     job_queue_curr += -1;
285     job_queue_curr->next = (void *)NULL;
286 
287     // Move to the next tile
288     job_queue += jobs_per_tile_col;
289   }
290 
291   for (i = 0; i < cpi->num_workers; i++) {
292     EncWorkerData *thread_data;
293     thread_data = &cpi->tile_thr_data[i];
294     thread_data->thread_id = i;
295 
296     for (tile_col = 0; tile_col < tile_cols; tile_col++)
297       thread_data->tile_completion_status[tile_col] = 0;
298   }
299 }
300 
vp9_get_tiles_proc_status(MultiThreadHandle * multi_thread_ctxt,int * tile_completion_status,int * cur_tile_id,int tile_cols)301 int vp9_get_tiles_proc_status(MultiThreadHandle *multi_thread_ctxt,
302                               int *tile_completion_status, int *cur_tile_id,
303                               int tile_cols) {
304   int tile_col;
305   int tile_id = -1;  // Stores the tile ID with minimum proc done
306   int max_num_jobs_remaining = 0;
307   int num_jobs_remaining;
308 
309   // Mark the completion to avoid check in the loop
310   tile_completion_status[*cur_tile_id] = 1;
311   // Check for the status of all the tiles
312   for (tile_col = 0; tile_col < tile_cols; tile_col++) {
313     if (tile_completion_status[tile_col] == 0) {
314       num_jobs_remaining =
315           vp9_get_job_queue_status(multi_thread_ctxt, tile_col);
316       // Mark the completion to avoid checks during future switches across tiles
317       if (num_jobs_remaining == 0) tile_completion_status[tile_col] = 1;
318       if (num_jobs_remaining > max_num_jobs_remaining) {
319         max_num_jobs_remaining = num_jobs_remaining;
320         tile_id = tile_col;
321       }
322     }
323   }
324 
325   if (-1 == tile_id) {
326     return 1;
327   } else {
328     // Update the cur ID to the next tile ID that will be processed,
329     // which will be the least processed tile
330     *cur_tile_id = tile_id;
331     return 0;
332   }
333 }
334