1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 use super::{CommandBuffer, CommandEncoderStatus};
6 use crate::{
7     device::DeviceError, hub::GfxBackend, id::DeviceId, track::TrackerSet, FastHashMap,
8     PrivateFeatures, Stored, SubmissionIndex,
9 };
10 
11 #[cfg(debug_assertions)]
12 use crate::LabelHelpers;
13 
14 use hal::{command::CommandBuffer as _, device::Device as _, pool::CommandPool as _};
15 use parking_lot::Mutex;
16 use thiserror::Error;
17 
18 use std::thread;
19 
20 const GROW_AMOUNT: usize = 20;
21 
22 #[derive(Debug)]
23 struct CommandPool<B: hal::Backend> {
24     raw: B::CommandPool,
25     total: usize,
26     available: Vec<B::CommandBuffer>,
27     pending: Vec<(B::CommandBuffer, SubmissionIndex)>,
28 }
29 
30 impl<B: hal::Backend> CommandPool<B> {
maintain(&mut self, last_done_index: SubmissionIndex)31     fn maintain(&mut self, last_done_index: SubmissionIndex) {
32         for i in (0..self.pending.len()).rev() {
33             if self.pending[i].1 <= last_done_index {
34                 let (cmd_buf, index) = self.pending.swap_remove(i);
35                 log::trace!(
36                     "recycling cmdbuf submitted in {} when {} is last done",
37                     index,
38                     last_done_index,
39                 );
40                 self.recycle(cmd_buf);
41             }
42         }
43     }
44 
recycle(&mut self, mut raw: B::CommandBuffer)45     fn recycle(&mut self, mut raw: B::CommandBuffer) {
46         unsafe {
47             raw.reset(false);
48         }
49         self.available.push(raw);
50     }
51 
allocate(&mut self) -> B::CommandBuffer52     fn allocate(&mut self) -> B::CommandBuffer {
53         if self.available.is_empty() {
54             self.total += GROW_AMOUNT;
55             unsafe {
56                 self.raw.allocate(
57                     GROW_AMOUNT,
58                     hal::command::Level::Primary,
59                     &mut self.available,
60                 )
61             };
62         }
63         self.available.pop().unwrap()
64     }
65 
destroy(mut self, device: &B::Device)66     fn destroy(mut self, device: &B::Device) {
67         unsafe {
68             self.raw.free(self.available.into_iter());
69             device.destroy_command_pool(self.raw);
70         }
71     }
72 }
73 
74 #[derive(Debug)]
75 struct Inner<B: hal::Backend> {
76     pools: FastHashMap<thread::ThreadId, CommandPool<B>>,
77 }
78 
79 #[derive(Debug)]
80 pub struct CommandAllocator<B: hal::Backend> {
81     queue_family: hal::queue::QueueFamilyId,
82     internal_thread_id: thread::ThreadId,
83     inner: Mutex<Inner<B>>,
84 }
85 
86 impl<B: GfxBackend> CommandAllocator<B> {
87     #[allow(clippy::too_many_arguments)]
allocate( &self, device_id: Stored<DeviceId>, device: &B::Device, limits: wgt::Limits, downlevel: wgt::DownlevelProperties, features: wgt::Features, private_features: PrivateFeatures, label: &crate::Label, #[cfg(feature = "trace")] enable_tracing: bool, ) -> Result<CommandBuffer<B>, CommandAllocatorError>88     pub(crate) fn allocate(
89         &self,
90         device_id: Stored<DeviceId>,
91         device: &B::Device,
92         limits: wgt::Limits,
93         downlevel: wgt::DownlevelProperties,
94         features: wgt::Features,
95         private_features: PrivateFeatures,
96         label: &crate::Label,
97         #[cfg(feature = "trace")] enable_tracing: bool,
98     ) -> Result<CommandBuffer<B>, CommandAllocatorError> {
99         //debug_assert_eq!(device_id.backend(), B::VARIANT);
100         let thread_id = thread::current().id();
101         let mut inner = self.inner.lock();
102 
103         use std::collections::hash_map::Entry;
104         let pool = match inner.pools.entry(thread_id) {
105             Entry::Vacant(e) => {
106                 log::info!("Starting on thread {:?}", thread_id);
107                 let raw = unsafe {
108                     device
109                         .create_command_pool(
110                             self.queue_family,
111                             hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL,
112                         )
113                         .or(Err(DeviceError::OutOfMemory))?
114                 };
115                 e.insert(CommandPool {
116                     raw,
117                     total: 0,
118                     available: Vec::new(),
119                     pending: Vec::new(),
120                 })
121             }
122             Entry::Occupied(e) => e.into_mut(),
123         };
124 
125         //Note: we have to allocate the first buffer right here, or otherwise
126         // the pool may be cleaned up by maintenance called from another thread.
127 
128         Ok(CommandBuffer {
129             raw: vec![pool.allocate()],
130             status: CommandEncoderStatus::Recording,
131             recorded_thread_id: thread_id,
132             device_id,
133             trackers: TrackerSet::new(B::VARIANT),
134             used_swap_chains: Default::default(),
135             buffer_memory_init_actions: Default::default(),
136             limits,
137             downlevel,
138             private_features,
139             support_fill_buffer_texture: features.contains(wgt::Features::CLEAR_COMMANDS),
140             has_labels: label.is_some(),
141             #[cfg(feature = "trace")]
142             commands: if enable_tracing {
143                 Some(Vec::new())
144             } else {
145                 None
146             },
147             #[cfg(debug_assertions)]
148             label: label.to_string_or_default(),
149         })
150     }
151 }
152 
153 impl<B: hal::Backend> CommandAllocator<B> {
new( queue_family: hal::queue::QueueFamilyId, device: &B::Device, ) -> Result<Self, CommandAllocatorError>154     pub fn new(
155         queue_family: hal::queue::QueueFamilyId,
156         device: &B::Device,
157     ) -> Result<Self, CommandAllocatorError> {
158         let internal_thread_id = thread::current().id();
159         log::info!("Starting on (internal) thread {:?}", internal_thread_id);
160         let mut pools = FastHashMap::default();
161         pools.insert(
162             internal_thread_id,
163             CommandPool {
164                 raw: unsafe {
165                     device
166                         .create_command_pool(
167                             queue_family,
168                             hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL,
169                         )
170                         .or(Err(DeviceError::OutOfMemory))?
171                 },
172                 total: 0,
173                 available: Vec::new(),
174                 pending: Vec::new(),
175             },
176         );
177         Ok(Self {
178             queue_family,
179             internal_thread_id,
180             inner: Mutex::new(Inner { pools }),
181         })
182     }
183 
allocate_for_thread_id(&self, thread_id: thread::ThreadId) -> B::CommandBuffer184     fn allocate_for_thread_id(&self, thread_id: thread::ThreadId) -> B::CommandBuffer {
185         let mut inner = self.inner.lock();
186         inner.pools.get_mut(&thread_id).unwrap().allocate()
187     }
188 
allocate_internal(&self) -> B::CommandBuffer189     pub fn allocate_internal(&self) -> B::CommandBuffer {
190         self.allocate_for_thread_id(self.internal_thread_id)
191     }
192 
extend(&self, cmd_buf: &CommandBuffer<B>) -> B::CommandBuffer193     pub fn extend(&self, cmd_buf: &CommandBuffer<B>) -> B::CommandBuffer {
194         self.allocate_for_thread_id(cmd_buf.recorded_thread_id)
195     }
196 
discard_internal(&self, raw: B::CommandBuffer)197     pub fn discard_internal(&self, raw: B::CommandBuffer) {
198         let mut inner = self.inner.lock();
199         inner
200             .pools
201             .get_mut(&self.internal_thread_id)
202             .unwrap()
203             .recycle(raw);
204     }
205 
discard(&self, mut cmd_buf: CommandBuffer<B>)206     pub fn discard(&self, mut cmd_buf: CommandBuffer<B>) {
207         cmd_buf.trackers.clear();
208         let mut inner = self.inner.lock();
209         let pool = inner.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap();
210         for raw in cmd_buf.raw {
211             pool.recycle(raw);
212         }
213     }
214 
after_submit_internal(&self, raw: B::CommandBuffer, submit_index: SubmissionIndex)215     pub fn after_submit_internal(&self, raw: B::CommandBuffer, submit_index: SubmissionIndex) {
216         let mut inner = self.inner.lock();
217         inner
218             .pools
219             .get_mut(&self.internal_thread_id)
220             .unwrap()
221             .pending
222             .push((raw, submit_index));
223     }
224 
after_submit( &self, cmd_buf: CommandBuffer<B>, device: &B::Device, submit_index: SubmissionIndex, )225     pub fn after_submit(
226         &self,
227         cmd_buf: CommandBuffer<B>,
228         device: &B::Device,
229         submit_index: SubmissionIndex,
230     ) {
231         // Record this command buffer as pending
232         let mut inner = self.inner.lock();
233         let clear_label = cmd_buf.has_labels;
234         inner
235             .pools
236             .get_mut(&cmd_buf.recorded_thread_id)
237             .unwrap()
238             .pending
239             .extend(cmd_buf.raw.into_iter().map(|mut raw| {
240                 if clear_label {
241                     unsafe { device.set_command_buffer_name(&mut raw, "") };
242                 }
243                 (raw, submit_index)
244             }));
245     }
246 
maintain(&self, device: &B::Device, last_done_index: SubmissionIndex)247     pub fn maintain(&self, device: &B::Device, last_done_index: SubmissionIndex) {
248         profiling::scope!("maintain", "CommandAllocator");
249         let mut inner = self.inner.lock();
250         let mut remove_threads = Vec::new();
251         for (&thread_id, pool) in inner.pools.iter_mut() {
252             pool.maintain(last_done_index);
253             if pool.total == pool.available.len() && thread_id != self.internal_thread_id {
254                 assert!(pool.pending.is_empty());
255                 remove_threads.push(thread_id);
256             }
257         }
258         for thread_id in remove_threads {
259             log::info!("Removing from thread {:?}", thread_id);
260             let pool = inner.pools.remove(&thread_id).unwrap();
261             pool.destroy(device);
262         }
263     }
264 
destroy(self, device: &B::Device)265     pub fn destroy(self, device: &B::Device) {
266         let mut inner = self.inner.lock();
267         for (_, mut pool) in inner.pools.drain() {
268             while let Some((raw, _)) = pool.pending.pop() {
269                 pool.recycle(raw);
270             }
271             if pool.total != pool.available.len() {
272                 log::error!(
273                     "Some command buffers are still recorded, only tracking {} / {}",
274                     pool.available.len(),
275                     pool.total
276                 );
277             }
278             pool.destroy(device);
279         }
280     }
281 }
282 
283 #[derive(Clone, Debug, Error)]
284 pub enum CommandAllocatorError {
285     #[error(transparent)]
286     Device(#[from] DeviceError),
287 }
288