1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 use bits_interface::{error::ErrorType, BitsRequest};
5 
6 use bits_client::{
7     bits_protocol::HResultMessage, BitsJobState, BitsMonitorClient, Guid, JobStatus, PipeError,
8 };
9 use crossbeam_utils::atomic::AtomicCell;
10 use log::error;
11 use moz_task::{get_main_thread, is_main_thread};
12 use nserror::{nsresult, NS_ERROR_ABORT, NS_ERROR_FAILURE, NS_OK};
13 use nsstring::{nsACString, nsCString};
14 use xpcom::{
15     interfaces::{nsIEventTarget, nsIThread},
16     xpcom, xpcom_method, RefPtr, ThreadBoundRefPtr,
17 };
18 
19 /// This function takes the output of BitsMonitorClient::get_status() and uses
20 /// it to determine whether the the transfer has started. If the argument
21 /// contains an error, the transfer is considered started because we also
22 /// consider a transfer stopped on error.
23 /// This function is used to determine whether the OnStartRequest and OnProgress
24 /// observer functions should be called.
transfer_started(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool25 fn transfer_started(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool {
26     match status.as_ref() {
27         Ok(Ok(job_status)) => match job_status.state {
28             BitsJobState::Queued | BitsJobState::Connecting => false,
29             _ => true,
30         },
31         Ok(Err(_)) => true,
32         Err(_) => true,
33     }
34 }
35 
36 /// This function takes the output of BitsMonitorClient::get_status() and uses
37 /// it to determine whether the the transfer has stopped. If the argument
38 /// contains an error, the transfer is considered stopped.
39 /// A number of things will be done when a transfer is completed, such as
40 /// calling the observer's OnStopRequest method.
transfer_completed(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool41 fn transfer_completed(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool {
42     match status.as_ref() {
43         Ok(Ok(job_status)) => match job_status.state {
44             BitsJobState::Error
45             | BitsJobState::Transferred
46             | BitsJobState::Acknowledged
47             | BitsJobState::Cancelled => true,
48             _ => false,
49         },
50         Ok(Err(_)) => true,
51         Err(_) => true,
52     }
53 }
54 
55 /// BitsRequest implements nsIRequest, which means that it must be able to
56 /// provide an nsresult status code. This function provides such a status code
57 /// based on the output of BitsMonitorClient::get_status().
status_to_nsresult(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> nsresult58 fn status_to_nsresult(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> nsresult {
59     match status.as_ref() {
60         Ok(Ok(job_status)) => match job_status.state {
61             BitsJobState::Cancelled => NS_ERROR_ABORT,
62             BitsJobState::Transferred | BitsJobState::Acknowledged => NS_OK,
63             _ => NS_ERROR_FAILURE,
64         },
65         Ok(Err(_)) => NS_ERROR_FAILURE,
66         Err(_) => NS_ERROR_FAILURE,
67     }
68 }
69 
70 /// This function takes the output of BitsMonitorClient::get_status() and uses
71 /// it to determine the result value of the request. This will take the form of
72 /// an Optional ErrorType value with a None value indicating success.
status_to_request_result( status: &Result<Result<JobStatus, HResultMessage>, PipeError>, ) -> Option<ErrorType>73 fn status_to_request_result(
74     status: &Result<Result<JobStatus, HResultMessage>, PipeError>,
75 ) -> Option<ErrorType> {
76     match status.as_ref() {
77         Ok(Ok(job_status)) => match job_status.state {
78             BitsJobState::Transferred | BitsJobState::Acknowledged => None,
79             BitsJobState::Cancelled => Some(ErrorType::BitsStateCancelled),
80             BitsJobState::Error => Some(ErrorType::BitsStateError),
81             BitsJobState::TransientError => Some(ErrorType::BitsStateTransientError),
82             _ => Some(ErrorType::BitsStateUnexpected),
83         },
84         Ok(Err(_)) => Some(ErrorType::FailedToGetJobStatus),
85         Err(pipe_error) => Some(pipe_error.into()),
86     }
87 }
88 
89 /// MonitorRunnable is an nsIRunnable meant to be dispatched off thread. It will
90 /// perform the following actions:
91 ///   1. Call BitsMonitorClient::get_status and store the result.
92 ///   2. Dispatch itself back to the main thread.
93 ///   3. Report the status to the observer.
94 ///   4. If the transfer has finished, free its data and return, otherwise:
95 ///   5. Dispatch itself back to its original thread and repeat from step 1.
96 #[derive(xpcom)]
97 #[xpimplements(nsIRunnable, nsINamed)]
98 #[refcnt = "atomic"]
99 pub struct InitMonitorRunnable {
100     request: AtomicCell<Option<ThreadBoundRefPtr<BitsRequest>>>,
101     id: Guid,
102     timeout: u32,
103     monitor_client: AtomicCell<Option<BitsMonitorClient>>,
104     // This cell contains an Option, possibly containing the return value of
105     // BitsMonitorClient::get_status.
106     status: AtomicCell<Option<Result<Result<JobStatus, HResultMessage>, PipeError>>>,
107     request_started: AtomicCell<bool>,
108     in_error_state: AtomicCell<bool>,
109 }
110 
111 impl MonitorRunnable {
new( request: RefPtr<BitsRequest>, id: Guid, timeout: u32, monitor_client: BitsMonitorClient, ) -> RefPtr<MonitorRunnable>112     pub fn new(
113         request: RefPtr<BitsRequest>,
114         id: Guid,
115         timeout: u32,
116         monitor_client: BitsMonitorClient,
117     ) -> RefPtr<MonitorRunnable> {
118         MonitorRunnable::allocate(InitMonitorRunnable {
119             request: AtomicCell::new(Some(ThreadBoundRefPtr::new(request))),
120             id,
121             timeout,
122             monitor_client: AtomicCell::new(Some(monitor_client)),
123             status: AtomicCell::new(None),
124             request_started: AtomicCell::new(false),
125             in_error_state: AtomicCell::new(false),
126         })
127     }
128 
dispatch(&self, thread: RefPtr<nsIThread>) -> Result<(), nsresult>129     pub fn dispatch(&self, thread: RefPtr<nsIThread>) -> Result<(), nsresult> {
130         unsafe { thread.DispatchFromScript(self.coerce(), nsIEventTarget::DISPATCH_NORMAL as u32) }
131             .to_result()
132     }
133 
free_mainthread_data(&self)134     fn free_mainthread_data(&self) {
135         if is_main_thread() {
136             // This is not safe to free unless on the main thread
137             self.request.swap(None);
138         } else {
139             error!("Attempting to free data on the main thread, but not on the main thread");
140         }
141     }
142 
143     xpcom_method!(run => Run());
144 
145     /// This method is essentially a error-handling wrapper around try_run.
146     /// This is done to make it easier to ensure that main-thread data is freed
147     /// on the main thread.
run(&self) -> Result<(), nsresult>148     pub fn run(&self) -> Result<(), nsresult> {
149         if self.in_error_state.load() {
150             self.free_mainthread_data();
151             return Err(NS_ERROR_FAILURE);
152         }
153 
154         self.try_run().or_else(|error_message| {
155             error!("{}", error_message);
156 
157             // Once an error has been encountered, we need to free all of our
158             // data, but it all needs to be freed on the main thread.
159             self.in_error_state.store(true);
160             if is_main_thread() {
161                 self.free_mainthread_data();
162                 Err(NS_ERROR_FAILURE)
163             } else {
164                 self.dispatch(get_main_thread()?)
165             }
166         })
167     }
168 
169     /// This function performs all the primary functionality of MonitorRunnable.
170     /// See the documentation for InitMonitorRunnable/MonitorRunnable for
171     /// details.
try_run(&self) -> Result<(), String>172     pub fn try_run(&self) -> Result<(), String> {
173         if !is_main_thread() {
174             let mut monitor_client = self
175                 .monitor_client
176                 .swap(None)
177                 .ok_or("Missing monitor client")?;
178             self.status
179                 .store(Some(monitor_client.get_status(self.timeout)));
180             self.monitor_client.store(Some(monitor_client));
181 
182             let main_thread =
183                 get_main_thread().map_err(|rv| format!("Unable to get main thread: {}", rv))?;
184 
185             self.dispatch(main_thread)
186                 .map_err(|rv| format!("Unable to dispatch to main thread: {}", rv))
187         } else {
188             let status = self.status.swap(None).ok_or("Missing status object")?;
189             let tb_request = self.request.swap(None).ok_or("Missing request")?;
190 
191             // This block bounds the scope for request to ensure that it ends
192             // before re-storing tb_request.
193             let maybe_next_thread: Option<RefPtr<nsIThread>> = {
194                 let request = tb_request
195                     .get_ref()
196                     .ok_or("BitsRequest is on the wrong thread")?;
197 
198                 if !self.request_started.load() && transfer_started(&status) {
199                     self.request_started.store(true);
200                     request.on_start();
201                 }
202 
203                 if self.request_started.load() {
204                     if let Ok(Ok(job_status)) = status.as_ref() {
205                         let transferred_bytes = job_status.progress.transferred_bytes as i64;
206                         let total_bytes = match job_status.progress.total_bytes {
207                             Some(total) => total as i64,
208                             None => -1i64,
209                         };
210                         request.on_progress(transferred_bytes, total_bytes);
211                     }
212                 }
213 
214                 if transfer_completed(&status) {
215                     request.on_stop(Some((
216                         status_to_nsresult(&status),
217                         status_to_request_result(&status),
218                     )));
219 
220                     // Transfer completed. No need to dispatch back to the monitor thread.
221                     None
222                 } else {
223                     Some(
224                         request
225                             .get_monitor_thread()
226                             .ok_or("Missing monitor thread")?,
227                     )
228                 }
229             };
230 
231             self.request.store(Some(tb_request));
232 
233             match maybe_next_thread {
234                 Some(next_thread) => self
235                     .dispatch(next_thread)
236                     .map_err(|rv| format!("Unable to dispatch to thread: {}", rv)),
237                 None => {
238                     self.free_mainthread_data();
239                     Ok(())
240                 }
241             }
242         }
243     }
244 
245     xpcom_method!(get_name => GetName() -> nsACString);
get_name(&self) -> Result<nsCString, nsresult>246     fn get_name(&self) -> Result<nsCString, nsresult> {
247         Ok(nsCString::from("BitsRequest::Monitor"))
248     }
249 }
250