1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 use super::{CommandBuffer, CommandEncoderStatus}; 6 use crate::{ 7 device::DeviceError, hub::GfxBackend, id::DeviceId, track::TrackerSet, FastHashMap, 8 PrivateFeatures, Stored, SubmissionIndex, 9 }; 10 11 #[cfg(debug_assertions)] 12 use crate::LabelHelpers; 13 14 use hal::{command::CommandBuffer as _, device::Device as _, pool::CommandPool as _}; 15 use parking_lot::Mutex; 16 use thiserror::Error; 17 18 use std::thread; 19 20 const GROW_AMOUNT: usize = 20; 21 22 #[derive(Debug)] 23 struct CommandPool<B: hal::Backend> { 24 raw: B::CommandPool, 25 total: usize, 26 available: Vec<B::CommandBuffer>, 27 pending: Vec<(B::CommandBuffer, SubmissionIndex)>, 28 } 29 30 impl<B: hal::Backend> CommandPool<B> { maintain(&mut self, last_done_index: SubmissionIndex)31 fn maintain(&mut self, last_done_index: SubmissionIndex) { 32 for i in (0..self.pending.len()).rev() { 33 if self.pending[i].1 <= last_done_index { 34 let (cmd_buf, index) = self.pending.swap_remove(i); 35 log::trace!( 36 "recycling cmdbuf submitted in {} when {} is last done", 37 index, 38 last_done_index, 39 ); 40 self.recycle(cmd_buf); 41 } 42 } 43 } 44 recycle(&mut self, mut raw: B::CommandBuffer)45 fn recycle(&mut self, mut raw: B::CommandBuffer) { 46 unsafe { 47 raw.reset(false); 48 } 49 self.available.push(raw); 50 } 51 allocate(&mut self) -> B::CommandBuffer52 fn allocate(&mut self) -> B::CommandBuffer { 53 if self.available.is_empty() { 54 self.total += GROW_AMOUNT; 55 unsafe { 56 self.raw.allocate( 57 GROW_AMOUNT, 58 hal::command::Level::Primary, 59 &mut self.available, 60 ) 61 }; 62 } 63 self.available.pop().unwrap() 64 } 65 destroy(mut self, device: &B::Device)66 fn destroy(mut self, device: &B::Device) { 67 unsafe { 68 self.raw.free(self.available.into_iter()); 69 device.destroy_command_pool(self.raw); 70 } 71 } 72 } 73 74 #[derive(Debug)] 75 struct Inner<B: hal::Backend> { 76 pools: FastHashMap<thread::ThreadId, CommandPool<B>>, 77 } 78 79 #[derive(Debug)] 80 pub struct CommandAllocator<B: hal::Backend> { 81 queue_family: hal::queue::QueueFamilyId, 82 internal_thread_id: thread::ThreadId, 83 inner: Mutex<Inner<B>>, 84 } 85 86 impl<B: GfxBackend> CommandAllocator<B> { 87 #[allow(clippy::too_many_arguments)] allocate( &self, device_id: Stored<DeviceId>, device: &B::Device, limits: wgt::Limits, downlevel: wgt::DownlevelProperties, features: wgt::Features, private_features: PrivateFeatures, label: &crate::Label, #[cfg(feature = "trace")] enable_tracing: bool, ) -> Result<CommandBuffer<B>, CommandAllocatorError>88 pub(crate) fn allocate( 89 &self, 90 device_id: Stored<DeviceId>, 91 device: &B::Device, 92 limits: wgt::Limits, 93 downlevel: wgt::DownlevelProperties, 94 features: wgt::Features, 95 private_features: PrivateFeatures, 96 label: &crate::Label, 97 #[cfg(feature = "trace")] enable_tracing: bool, 98 ) -> Result<CommandBuffer<B>, CommandAllocatorError> { 99 //debug_assert_eq!(device_id.backend(), B::VARIANT); 100 let thread_id = thread::current().id(); 101 let mut inner = self.inner.lock(); 102 103 use std::collections::hash_map::Entry; 104 let pool = match inner.pools.entry(thread_id) { 105 Entry::Vacant(e) => { 106 log::info!("Starting on thread {:?}", thread_id); 107 let raw = unsafe { 108 device 109 .create_command_pool( 110 self.queue_family, 111 hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL, 112 ) 113 .or(Err(DeviceError::OutOfMemory))? 114 }; 115 e.insert(CommandPool { 116 raw, 117 total: 0, 118 available: Vec::new(), 119 pending: Vec::new(), 120 }) 121 } 122 Entry::Occupied(e) => e.into_mut(), 123 }; 124 125 //Note: we have to allocate the first buffer right here, or otherwise 126 // the pool may be cleaned up by maintenance called from another thread. 127 128 Ok(CommandBuffer { 129 raw: vec![pool.allocate()], 130 status: CommandEncoderStatus::Recording, 131 recorded_thread_id: thread_id, 132 device_id, 133 trackers: TrackerSet::new(B::VARIANT), 134 used_swap_chains: Default::default(), 135 buffer_memory_init_actions: Default::default(), 136 limits, 137 downlevel, 138 private_features, 139 support_fill_buffer_texture: features.contains(wgt::Features::CLEAR_COMMANDS), 140 has_labels: label.is_some(), 141 #[cfg(feature = "trace")] 142 commands: if enable_tracing { 143 Some(Vec::new()) 144 } else { 145 None 146 }, 147 #[cfg(debug_assertions)] 148 label: label.to_string_or_default(), 149 }) 150 } 151 } 152 153 impl<B: hal::Backend> CommandAllocator<B> { new( queue_family: hal::queue::QueueFamilyId, device: &B::Device, ) -> Result<Self, CommandAllocatorError>154 pub fn new( 155 queue_family: hal::queue::QueueFamilyId, 156 device: &B::Device, 157 ) -> Result<Self, CommandAllocatorError> { 158 let internal_thread_id = thread::current().id(); 159 log::info!("Starting on (internal) thread {:?}", internal_thread_id); 160 let mut pools = FastHashMap::default(); 161 pools.insert( 162 internal_thread_id, 163 CommandPool { 164 raw: unsafe { 165 device 166 .create_command_pool( 167 queue_family, 168 hal::pool::CommandPoolCreateFlags::RESET_INDIVIDUAL, 169 ) 170 .or(Err(DeviceError::OutOfMemory))? 171 }, 172 total: 0, 173 available: Vec::new(), 174 pending: Vec::new(), 175 }, 176 ); 177 Ok(Self { 178 queue_family, 179 internal_thread_id, 180 inner: Mutex::new(Inner { pools }), 181 }) 182 } 183 allocate_for_thread_id(&self, thread_id: thread::ThreadId) -> B::CommandBuffer184 fn allocate_for_thread_id(&self, thread_id: thread::ThreadId) -> B::CommandBuffer { 185 let mut inner = self.inner.lock(); 186 inner.pools.get_mut(&thread_id).unwrap().allocate() 187 } 188 allocate_internal(&self) -> B::CommandBuffer189 pub fn allocate_internal(&self) -> B::CommandBuffer { 190 self.allocate_for_thread_id(self.internal_thread_id) 191 } 192 extend(&self, cmd_buf: &CommandBuffer<B>) -> B::CommandBuffer193 pub fn extend(&self, cmd_buf: &CommandBuffer<B>) -> B::CommandBuffer { 194 self.allocate_for_thread_id(cmd_buf.recorded_thread_id) 195 } 196 discard_internal(&self, raw: B::CommandBuffer)197 pub fn discard_internal(&self, raw: B::CommandBuffer) { 198 let mut inner = self.inner.lock(); 199 inner 200 .pools 201 .get_mut(&self.internal_thread_id) 202 .unwrap() 203 .recycle(raw); 204 } 205 discard(&self, mut cmd_buf: CommandBuffer<B>)206 pub fn discard(&self, mut cmd_buf: CommandBuffer<B>) { 207 cmd_buf.trackers.clear(); 208 let mut inner = self.inner.lock(); 209 let pool = inner.pools.get_mut(&cmd_buf.recorded_thread_id).unwrap(); 210 for raw in cmd_buf.raw { 211 pool.recycle(raw); 212 } 213 } 214 after_submit_internal(&self, raw: B::CommandBuffer, submit_index: SubmissionIndex)215 pub fn after_submit_internal(&self, raw: B::CommandBuffer, submit_index: SubmissionIndex) { 216 let mut inner = self.inner.lock(); 217 inner 218 .pools 219 .get_mut(&self.internal_thread_id) 220 .unwrap() 221 .pending 222 .push((raw, submit_index)); 223 } 224 after_submit( &self, cmd_buf: CommandBuffer<B>, device: &B::Device, submit_index: SubmissionIndex, )225 pub fn after_submit( 226 &self, 227 cmd_buf: CommandBuffer<B>, 228 device: &B::Device, 229 submit_index: SubmissionIndex, 230 ) { 231 // Record this command buffer as pending 232 let mut inner = self.inner.lock(); 233 let clear_label = cmd_buf.has_labels; 234 inner 235 .pools 236 .get_mut(&cmd_buf.recorded_thread_id) 237 .unwrap() 238 .pending 239 .extend(cmd_buf.raw.into_iter().map(|mut raw| { 240 if clear_label { 241 unsafe { device.set_command_buffer_name(&mut raw, "") }; 242 } 243 (raw, submit_index) 244 })); 245 } 246 maintain(&self, device: &B::Device, last_done_index: SubmissionIndex)247 pub fn maintain(&self, device: &B::Device, last_done_index: SubmissionIndex) { 248 profiling::scope!("maintain", "CommandAllocator"); 249 let mut inner = self.inner.lock(); 250 let mut remove_threads = Vec::new(); 251 for (&thread_id, pool) in inner.pools.iter_mut() { 252 pool.maintain(last_done_index); 253 if pool.total == pool.available.len() && thread_id != self.internal_thread_id { 254 assert!(pool.pending.is_empty()); 255 remove_threads.push(thread_id); 256 } 257 } 258 for thread_id in remove_threads { 259 log::info!("Removing from thread {:?}", thread_id); 260 let pool = inner.pools.remove(&thread_id).unwrap(); 261 pool.destroy(device); 262 } 263 } 264 destroy(self, device: &B::Device)265 pub fn destroy(self, device: &B::Device) { 266 let mut inner = self.inner.lock(); 267 for (_, mut pool) in inner.pools.drain() { 268 while let Some((raw, _)) = pool.pending.pop() { 269 pool.recycle(raw); 270 } 271 if pool.total != pool.available.len() { 272 log::error!( 273 "Some command buffers are still recorded, only tracking {} / {}", 274 pool.available.len(), 275 pool.total 276 ); 277 } 278 pool.destroy(device); 279 } 280 } 281 } 282 283 #[derive(Clone, Debug, Error)] 284 pub enum CommandAllocatorError { 285 #[error(transparent)] 286 Device(#[from] DeviceError), 287 } 288