1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 use bits_interface::{error::ErrorType, BitsRequest};
5
6 use bits_client::{
7 bits_protocol::HResultMessage, BitsJobState, BitsMonitorClient, Guid, JobStatus, PipeError,
8 };
9 use crossbeam_utils::atomic::AtomicCell;
10 use log::error;
11 use moz_task::{get_main_thread, is_main_thread};
12 use nserror::{nsresult, NS_ERROR_ABORT, NS_ERROR_FAILURE, NS_OK};
13 use nsstring::{nsACString, nsCString};
14 use xpcom::{
15 interfaces::{nsIEventTarget, nsIThread},
16 xpcom, xpcom_method, RefPtr, ThreadBoundRefPtr,
17 };
18
19 /// This function takes the output of BitsMonitorClient::get_status() and uses
20 /// it to determine whether the the transfer has started. If the argument
21 /// contains an error, the transfer is considered started because we also
22 /// consider a transfer stopped on error.
23 /// This function is used to determine whether the OnStartRequest and OnProgress
24 /// observer functions should be called.
transfer_started(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool25 fn transfer_started(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool {
26 match status.as_ref() {
27 Ok(Ok(job_status)) => match job_status.state {
28 BitsJobState::Queued | BitsJobState::Connecting => false,
29 _ => true,
30 },
31 Ok(Err(_)) => true,
32 Err(_) => true,
33 }
34 }
35
36 /// This function takes the output of BitsMonitorClient::get_status() and uses
37 /// it to determine whether the the transfer has stopped. If the argument
38 /// contains an error, the transfer is considered stopped.
39 /// A number of things will be done when a transfer is completed, such as
40 /// calling the observer's OnStopRequest method.
transfer_completed(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool41 fn transfer_completed(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> bool {
42 match status.as_ref() {
43 Ok(Ok(job_status)) => match job_status.state {
44 BitsJobState::Error
45 | BitsJobState::Transferred
46 | BitsJobState::Acknowledged
47 | BitsJobState::Cancelled => true,
48 _ => false,
49 },
50 Ok(Err(_)) => true,
51 Err(_) => true,
52 }
53 }
54
55 /// BitsRequest implements nsIRequest, which means that it must be able to
56 /// provide an nsresult status code. This function provides such a status code
57 /// based on the output of BitsMonitorClient::get_status().
status_to_nsresult(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> nsresult58 fn status_to_nsresult(status: &Result<Result<JobStatus, HResultMessage>, PipeError>) -> nsresult {
59 match status.as_ref() {
60 Ok(Ok(job_status)) => match job_status.state {
61 BitsJobState::Cancelled => NS_ERROR_ABORT,
62 BitsJobState::Transferred | BitsJobState::Acknowledged => NS_OK,
63 _ => NS_ERROR_FAILURE,
64 },
65 Ok(Err(_)) => NS_ERROR_FAILURE,
66 Err(_) => NS_ERROR_FAILURE,
67 }
68 }
69
70 /// This function takes the output of BitsMonitorClient::get_status() and uses
71 /// it to determine the result value of the request. This will take the form of
72 /// an Optional ErrorType value with a None value indicating success.
status_to_request_result( status: &Result<Result<JobStatus, HResultMessage>, PipeError>, ) -> Option<ErrorType>73 fn status_to_request_result(
74 status: &Result<Result<JobStatus, HResultMessage>, PipeError>,
75 ) -> Option<ErrorType> {
76 match status.as_ref() {
77 Ok(Ok(job_status)) => match job_status.state {
78 BitsJobState::Transferred | BitsJobState::Acknowledged => None,
79 BitsJobState::Cancelled => Some(ErrorType::BitsStateCancelled),
80 BitsJobState::Error => Some(ErrorType::BitsStateError),
81 BitsJobState::TransientError => Some(ErrorType::BitsStateTransientError),
82 _ => Some(ErrorType::BitsStateUnexpected),
83 },
84 Ok(Err(_)) => Some(ErrorType::FailedToGetJobStatus),
85 Err(pipe_error) => Some(pipe_error.into()),
86 }
87 }
88
89 /// MonitorRunnable is an nsIRunnable meant to be dispatched off thread. It will
90 /// perform the following actions:
91 /// 1. Call BitsMonitorClient::get_status and store the result.
92 /// 2. Dispatch itself back to the main thread.
93 /// 3. Report the status to the observer.
94 /// 4. If the transfer has finished, free its data and return, otherwise:
95 /// 5. Dispatch itself back to its original thread and repeat from step 1.
96 #[derive(xpcom)]
97 #[xpimplements(nsIRunnable, nsINamed)]
98 #[refcnt = "atomic"]
99 pub struct InitMonitorRunnable {
100 request: AtomicCell<Option<ThreadBoundRefPtr<BitsRequest>>>,
101 id: Guid,
102 timeout: u32,
103 monitor_client: AtomicCell<Option<BitsMonitorClient>>,
104 // This cell contains an Option, possibly containing the return value of
105 // BitsMonitorClient::get_status.
106 status: AtomicCell<Option<Result<Result<JobStatus, HResultMessage>, PipeError>>>,
107 request_started: AtomicCell<bool>,
108 in_error_state: AtomicCell<bool>,
109 }
110
111 impl MonitorRunnable {
new( request: RefPtr<BitsRequest>, id: Guid, timeout: u32, monitor_client: BitsMonitorClient, ) -> RefPtr<MonitorRunnable>112 pub fn new(
113 request: RefPtr<BitsRequest>,
114 id: Guid,
115 timeout: u32,
116 monitor_client: BitsMonitorClient,
117 ) -> RefPtr<MonitorRunnable> {
118 MonitorRunnable::allocate(InitMonitorRunnable {
119 request: AtomicCell::new(Some(ThreadBoundRefPtr::new(request))),
120 id,
121 timeout,
122 monitor_client: AtomicCell::new(Some(monitor_client)),
123 status: AtomicCell::new(None),
124 request_started: AtomicCell::new(false),
125 in_error_state: AtomicCell::new(false),
126 })
127 }
128
dispatch(&self, thread: RefPtr<nsIThread>) -> Result<(), nsresult>129 pub fn dispatch(&self, thread: RefPtr<nsIThread>) -> Result<(), nsresult> {
130 unsafe { thread.DispatchFromScript(self.coerce(), nsIEventTarget::DISPATCH_NORMAL as u32) }
131 .to_result()
132 }
133
free_mainthread_data(&self)134 fn free_mainthread_data(&self) {
135 if is_main_thread() {
136 // This is not safe to free unless on the main thread
137 self.request.swap(None);
138 } else {
139 error!("Attempting to free data on the main thread, but not on the main thread");
140 }
141 }
142
143 xpcom_method!(run => Run());
144
145 /// This method is essentially a error-handling wrapper around try_run.
146 /// This is done to make it easier to ensure that main-thread data is freed
147 /// on the main thread.
run(&self) -> Result<(), nsresult>148 pub fn run(&self) -> Result<(), nsresult> {
149 if self.in_error_state.load() {
150 self.free_mainthread_data();
151 return Err(NS_ERROR_FAILURE);
152 }
153
154 self.try_run().or_else(|error_message| {
155 error!("{}", error_message);
156
157 // Once an error has been encountered, we need to free all of our
158 // data, but it all needs to be freed on the main thread.
159 self.in_error_state.store(true);
160 if is_main_thread() {
161 self.free_mainthread_data();
162 Err(NS_ERROR_FAILURE)
163 } else {
164 self.dispatch(get_main_thread()?)
165 }
166 })
167 }
168
169 /// This function performs all the primary functionality of MonitorRunnable.
170 /// See the documentation for InitMonitorRunnable/MonitorRunnable for
171 /// details.
try_run(&self) -> Result<(), String>172 pub fn try_run(&self) -> Result<(), String> {
173 if !is_main_thread() {
174 let mut monitor_client = self
175 .monitor_client
176 .swap(None)
177 .ok_or("Missing monitor client")?;
178 self.status
179 .store(Some(monitor_client.get_status(self.timeout)));
180 self.monitor_client.store(Some(monitor_client));
181
182 let main_thread =
183 get_main_thread().map_err(|rv| format!("Unable to get main thread: {}", rv))?;
184
185 self.dispatch(main_thread)
186 .map_err(|rv| format!("Unable to dispatch to main thread: {}", rv))
187 } else {
188 let status = self.status.swap(None).ok_or("Missing status object")?;
189 let tb_request = self.request.swap(None).ok_or("Missing request")?;
190
191 // This block bounds the scope for request to ensure that it ends
192 // before re-storing tb_request.
193 let maybe_next_thread: Option<RefPtr<nsIThread>> = {
194 let request = tb_request
195 .get_ref()
196 .ok_or("BitsRequest is on the wrong thread")?;
197
198 if !self.request_started.load() && transfer_started(&status) {
199 self.request_started.store(true);
200 request.on_start();
201 }
202
203 if self.request_started.load() {
204 if let Ok(Ok(job_status)) = status.as_ref() {
205 let transferred_bytes = job_status.progress.transferred_bytes as i64;
206 let total_bytes = match job_status.progress.total_bytes {
207 Some(total) => total as i64,
208 None => -1i64,
209 };
210 request.on_progress(transferred_bytes, total_bytes);
211 }
212 }
213
214 if transfer_completed(&status) {
215 request.on_stop(Some((
216 status_to_nsresult(&status),
217 status_to_request_result(&status),
218 )));
219
220 // Transfer completed. No need to dispatch back to the monitor thread.
221 None
222 } else {
223 Some(
224 request
225 .get_monitor_thread()
226 .ok_or("Missing monitor thread")?,
227 )
228 }
229 };
230
231 self.request.store(Some(tb_request));
232
233 match maybe_next_thread {
234 Some(next_thread) => self
235 .dispatch(next_thread)
236 .map_err(|rv| format!("Unable to dispatch to thread: {}", rv)),
237 None => {
238 self.free_mainthread_data();
239 Ok(())
240 }
241 }
242 }
243 }
244
245 xpcom_method!(get_name => GetName() -> nsACString);
get_name(&self) -> Result<nsCString, nsresult>246 fn get_name(&self) -> Result<nsCString, nsresult> {
247 Ok(nsCString::from("BitsRequest::Monitor"))
248 }
249 }
250