1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 use super::{ 5 action::{Action, RequestAction}, 6 client::with_bits_client, 7 dispatch_callback::{ 8 maybe_dispatch_via_callback, CallbackExpected, CallbackOptional, IsCallbackExpected, 9 }, 10 error::BitsTaskError, 11 from_threadbound::{expect_from_threadbound_option, DataType}, 12 BitsRequest, 13 }; 14 15 use bits_client::{BitsClient, Guid}; 16 use crossbeam_utils::atomic::AtomicCell; 17 use log::info; 18 use moz_task::Task; 19 use nserror::nsresult; 20 use xpcom::{interfaces::nsIBitsCallback, RefPtr, ThreadBoundRefPtr}; 21 22 type RunFn<D> = fn(Guid, &D, &mut BitsClient) -> Result<(), BitsTaskError>; 23 type DoneFn = fn(&BitsRequest, bool) -> Result<(), BitsTaskError>; 24 25 pub struct RequestTask<D> { 26 request: AtomicCell<Option<ThreadBoundRefPtr<BitsRequest>>>, 27 guid: Guid, 28 action: RequestAction, 29 task_data: D, 30 run_fn: RunFn<D>, 31 maybe_done_fn: Option<DoneFn>, 32 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIBitsCallback>>>, 33 callback_presence: IsCallbackExpected, 34 result: AtomicCell<Option<Result<(), BitsTaskError>>>, 35 } 36 37 impl<D> RequestTask<D> 38 where 39 D: Sync + Send, 40 { new( request: RefPtr<BitsRequest>, guid: Guid, action: RequestAction, task_data: D, run_fn: RunFn<D>, maybe_done_fn: Option<DoneFn>, callback: Option<RefPtr<nsIBitsCallback>>, callback_presence: IsCallbackExpected, ) -> RequestTask<D>41 pub fn new( 42 request: RefPtr<BitsRequest>, 43 guid: Guid, 44 action: RequestAction, 45 task_data: D, 46 run_fn: RunFn<D>, 47 maybe_done_fn: Option<DoneFn>, 48 callback: Option<RefPtr<nsIBitsCallback>>, 49 callback_presence: IsCallbackExpected, 50 ) -> RequestTask<D> { 51 RequestTask { 52 request: AtomicCell::new(Some(ThreadBoundRefPtr::new(request))), 53 guid, 54 action, 55 task_data, 56 run_fn, 57 maybe_done_fn, 58 callback: AtomicCell::new(callback.map(ThreadBoundRefPtr::new)), 59 result: AtomicCell::new(None), 60 callback_presence, 61 } 62 } 63 } 64 65 impl<D> Task for RequestTask<D> { run(&self)66 fn run(&self) { 67 let result = with_bits_client(self.action.into(), |client| { 68 (self.run_fn)(self.guid.clone(), &self.task_data, client) 69 }); 70 self.result.store(Some(result)); 71 } 72 done(&self) -> Result<(), nsresult>73 fn done(&self) -> Result<(), nsresult> { 74 // If TaskRunnable.run() calls Task.done() to return a result 75 // on the main thread before TaskRunnable.run() returns on the worker 76 // thread, then the Task will get dropped on the worker thread. 77 // 78 // But the callback is an nsXPCWrappedJS that isn't safe to release 79 // on the worker thread. So we move it out of the Task here to ensure 80 // it gets released on the main thread. 81 let maybe_tb_callback = self.callback.swap(None); 82 // It also isn't safe to drop the BitsRequest RefPtr off-thread, 83 // because BitsRequest refcounting is non-atomic 84 let maybe_tb_request = self.request.swap(None); 85 86 let action: Action = self.action.into(); 87 let maybe_callback = 88 expect_from_threadbound_option(&maybe_tb_callback, DataType::Callback, action); 89 90 // Immediately invoked function expression to allow for the ? operator 91 let result: Result<(), BitsTaskError> = (|| { 92 let request = 93 expect_from_threadbound_option(&maybe_tb_request, DataType::BitsRequest, action)?; 94 95 let maybe_result = self.result.swap(None); 96 97 let success = if let Some(result) = maybe_result.as_ref() { 98 result.is_ok() 99 } else { 100 false 101 }; 102 103 if let Some(done_fn) = self.maybe_done_fn { 104 done_fn(request, success)?; 105 } 106 107 maybe_result.ok_or_else(|| BitsTaskError::missing_result(action))? 108 })(); 109 info!("BITS Request Task completed: {:?}", result); 110 maybe_dispatch_via_callback(result, maybe_callback, self.callback_presence) 111 } 112 } 113 114 pub struct CompleteTask(RequestTask<()>); 115 116 impl Task for CompleteTask { run(&self)117 fn run(&self) { 118 self.0.run(); 119 } 120 done(&self) -> Result<(), nsresult>121 fn done(&self) -> Result<(), nsresult> { 122 self.0.done() 123 } 124 } 125 126 impl CompleteTask { new( request: RefPtr<BitsRequest>, id: Guid, callback: RefPtr<nsIBitsCallback>, ) -> CompleteTask127 pub fn new( 128 request: RefPtr<BitsRequest>, 129 id: Guid, 130 callback: RefPtr<nsIBitsCallback>, 131 ) -> CompleteTask { 132 CompleteTask(RequestTask::new( 133 request, 134 id, 135 RequestAction::Complete, 136 (), 137 CompleteTask::run_fn, 138 Some(CompleteTask::done_fn), 139 Some(callback), 140 CallbackExpected, 141 )) 142 } 143 run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError>144 fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { 145 client 146 .complete_job(id) 147 .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Complete, pipe_error))??; 148 Ok(()) 149 } 150 done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError>151 fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> { 152 if success { 153 request.on_finished(); 154 } 155 Ok(()) 156 } 157 } 158 159 pub struct CancelTask(RequestTask<()>); 160 161 impl Task for CancelTask { run(&self)162 fn run(&self) { 163 self.0.run(); 164 } 165 done(&self) -> Result<(), nsresult>166 fn done(&self) -> Result<(), nsresult> { 167 self.0.done() 168 } 169 } 170 171 impl CancelTask { new( request: RefPtr<BitsRequest>, id: Guid, callback: Option<RefPtr<nsIBitsCallback>>, ) -> CancelTask172 pub fn new( 173 request: RefPtr<BitsRequest>, 174 id: Guid, 175 callback: Option<RefPtr<nsIBitsCallback>>, 176 ) -> CancelTask { 177 let callback_presence = if callback.is_some() { 178 CallbackExpected 179 } else { 180 CallbackOptional 181 }; 182 183 CancelTask(RequestTask::new( 184 request, 185 id, 186 RequestAction::Cancel, 187 (), 188 CancelTask::run_fn, 189 Some(CancelTask::done_fn), 190 callback, 191 callback_presence, 192 )) 193 } 194 run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError>195 fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { 196 client 197 .cancel_job(id) 198 .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Cancel, pipe_error))??; 199 Ok(()) 200 } 201 done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError>202 fn done_fn(request: &BitsRequest, success: bool) -> Result<(), BitsTaskError> { 203 request.finish_cancel_action(success); 204 Ok(()) 205 } 206 } 207 208 pub struct SuspendTask(RequestTask<()>); 209 210 impl Task for SuspendTask { run(&self)211 fn run(&self) { 212 self.0.run(); 213 } 214 done(&self) -> Result<(), nsresult>215 fn done(&self) -> Result<(), nsresult> { 216 self.0.done() 217 } 218 } 219 220 impl SuspendTask { new( request: RefPtr<BitsRequest>, id: Guid, callback: Option<RefPtr<nsIBitsCallback>>, ) -> SuspendTask221 pub fn new( 222 request: RefPtr<BitsRequest>, 223 id: Guid, 224 callback: Option<RefPtr<nsIBitsCallback>>, 225 ) -> SuspendTask { 226 let callback_presence = if callback.is_some() { 227 CallbackExpected 228 } else { 229 CallbackOptional 230 }; 231 232 SuspendTask(RequestTask::new( 233 request, 234 id, 235 RequestAction::Suspend, 236 (), 237 SuspendTask::run_fn, 238 None, 239 callback, 240 callback_presence, 241 )) 242 } 243 run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError>244 fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { 245 client 246 .suspend_job(id) 247 .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Suspend, pipe_error))??; 248 Ok(()) 249 } 250 } 251 252 pub struct ResumeTask(RequestTask<()>); 253 254 impl Task for ResumeTask { run(&self)255 fn run(&self) { 256 self.0.run(); 257 } 258 done(&self) -> Result<(), nsresult>259 fn done(&self) -> Result<(), nsresult> { 260 self.0.done() 261 } 262 } 263 264 impl ResumeTask { new( request: RefPtr<BitsRequest>, id: Guid, callback: Option<RefPtr<nsIBitsCallback>>, ) -> ResumeTask265 pub fn new( 266 request: RefPtr<BitsRequest>, 267 id: Guid, 268 callback: Option<RefPtr<nsIBitsCallback>>, 269 ) -> ResumeTask { 270 let callback_presence = if callback.is_some() { 271 CallbackExpected 272 } else { 273 CallbackOptional 274 }; 275 276 ResumeTask(RequestTask::new( 277 request, 278 id, 279 RequestAction::Resume, 280 (), 281 ResumeTask::run_fn, 282 None, 283 callback, 284 callback_presence, 285 )) 286 } 287 run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError>288 fn run_fn(id: Guid, _data: &(), client: &mut BitsClient) -> Result<(), BitsTaskError> { 289 client 290 .resume_job(id) 291 .map_err(|pipe_error| BitsTaskError::from_pipe(Action::Resume, pipe_error))??; 292 Ok(()) 293 } 294 } 295 296 pub struct ChangeMonitorIntervalTask(RequestTask<u32>); 297 298 impl Task for ChangeMonitorIntervalTask { run(&self)299 fn run(&self) { 300 self.0.run(); 301 } 302 done(&self) -> Result<(), nsresult>303 fn done(&self) -> Result<(), nsresult> { 304 self.0.done() 305 } 306 } 307 308 impl ChangeMonitorIntervalTask { new( request: RefPtr<BitsRequest>, id: Guid, update_interval_ms: u32, callback: RefPtr<nsIBitsCallback>, ) -> ChangeMonitorIntervalTask309 pub fn new( 310 request: RefPtr<BitsRequest>, 311 id: Guid, 312 update_interval_ms: u32, 313 callback: RefPtr<nsIBitsCallback>, 314 ) -> ChangeMonitorIntervalTask { 315 ChangeMonitorIntervalTask(RequestTask::new( 316 request, 317 id, 318 RequestAction::SetMonitorInterval, 319 update_interval_ms, 320 ChangeMonitorIntervalTask::run_fn, 321 None, 322 Some(callback), 323 CallbackExpected, 324 )) 325 } 326 run_fn( id: Guid, update_interval_ms: &u32, client: &mut BitsClient, ) -> Result<(), BitsTaskError>327 fn run_fn( 328 id: Guid, 329 update_interval_ms: &u32, 330 client: &mut BitsClient, 331 ) -> Result<(), BitsTaskError> { 332 client 333 .set_update_interval(id, *update_interval_ms) 334 .map_err(|pipe_error| { 335 BitsTaskError::from_pipe(Action::SetMonitorInterval, pipe_error) 336 })??; 337 Ok(()) 338 } 339 } 340 341 #[derive(Debug, PartialEq, Clone, Copy)] 342 pub enum Priority { 343 High, 344 Low, 345 } 346 347 pub struct SetPriorityTask(RequestTask<Priority>); 348 349 impl Task for SetPriorityTask { run(&self)350 fn run(&self) { 351 self.0.run(); 352 } 353 done(&self) -> Result<(), nsresult>354 fn done(&self) -> Result<(), nsresult> { 355 self.0.done() 356 } 357 } 358 359 impl SetPriorityTask { new( request: RefPtr<BitsRequest>, id: Guid, priority: Priority, callback: RefPtr<nsIBitsCallback>, ) -> SetPriorityTask360 pub fn new( 361 request: RefPtr<BitsRequest>, 362 id: Guid, 363 priority: Priority, 364 callback: RefPtr<nsIBitsCallback>, 365 ) -> SetPriorityTask { 366 SetPriorityTask(RequestTask::new( 367 request, 368 id, 369 RequestAction::SetPriority, 370 priority, 371 SetPriorityTask::run_fn, 372 None, 373 Some(callback), 374 CallbackExpected, 375 )) 376 } 377 run_fn(id: Guid, priority: &Priority, client: &mut BitsClient) -> Result<(), BitsTaskError>378 fn run_fn(id: Guid, priority: &Priority, client: &mut BitsClient) -> Result<(), BitsTaskError> { 379 client 380 .set_job_priority(id, *priority == Priority::High) 381 .map_err(|pipe_error| BitsTaskError::from_pipe(Action::SetPriority, pipe_error))??; 382 Ok(()) 383 } 384 } 385 386 pub struct SetNoProgressTimeoutTask(RequestTask<u32>); 387 388 impl Task for SetNoProgressTimeoutTask { run(&self)389 fn run(&self) { 390 self.0.run(); 391 } 392 done(&self) -> Result<(), nsresult>393 fn done(&self) -> Result<(), nsresult> { 394 self.0.done() 395 } 396 } 397 398 impl SetNoProgressTimeoutTask { new( request: RefPtr<BitsRequest>, id: Guid, timeout_secs: u32, callback: RefPtr<nsIBitsCallback>, ) -> SetNoProgressTimeoutTask399 pub fn new( 400 request: RefPtr<BitsRequest>, 401 id: Guid, 402 timeout_secs: u32, 403 callback: RefPtr<nsIBitsCallback>, 404 ) -> SetNoProgressTimeoutTask { 405 SetNoProgressTimeoutTask(RequestTask::new( 406 request, 407 id, 408 RequestAction::SetNoProgressTimeout, 409 timeout_secs, 410 SetNoProgressTimeoutTask::run_fn, 411 None, 412 Some(callback), 413 CallbackExpected, 414 )) 415 } 416 run_fn(id: Guid, timeout_secs: &u32, client: &mut BitsClient) -> Result<(), BitsTaskError>417 fn run_fn(id: Guid, timeout_secs: &u32, client: &mut BitsClient) -> Result<(), BitsTaskError> { 418 client 419 .set_no_progress_timeout(id, *timeout_secs) 420 .map_err(|pipe_error| { 421 BitsTaskError::from_pipe(Action::SetNoProgressTimeout, pipe_error) 422 })??; 423 Ok(()) 424 } 425 } 426