1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5 extern crate xpcom;
6
7 use crossbeam_utils::atomic::AtomicCell;
8 use error::KeyValueError;
9 use moz_task::Task;
10 use nserror::{nsresult, NS_ERROR_FAILURE};
11 use nsstring::nsCString;
12 use owned_value::owned_to_variant;
13 use rkv::backend::{BackendInfo, SafeMode, SafeModeDatabase, SafeModeEnvironment};
14 use rkv::{Migrator, OwnedValue, StoreError, StoreOptions, Value};
15 use std::{
16 path::Path,
17 str,
18 sync::{Arc, RwLock},
19 };
20 use storage_variant::VariantType;
21 use xpcom::{
22 interfaces::{
23 nsIKeyValueDatabaseCallback, nsIKeyValueEnumeratorCallback, nsIKeyValueVariantCallback,
24 nsIKeyValueVoidCallback, nsIVariant,
25 },
26 RefPtr, ThreadBoundRefPtr,
27 };
28 use KeyValueDatabase;
29 use KeyValueEnumerator;
30 use KeyValuePairResult;
31
32 type Manager = rkv::Manager<SafeModeEnvironment>;
33 type Rkv = rkv::Rkv<SafeModeEnvironment>;
34 type SingleStore = rkv::SingleStore<SafeModeDatabase>;
35
36 /// A macro to generate a done() implementation for a Task.
37 /// Takes one argument that specifies the type of the Task's callback function:
38 /// value: a callback function that takes a value
39 /// void: the callback function doesn't take a value
40 ///
41 /// The "value" variant calls self.convert() to convert a successful result
42 /// into the value to pass to the callback function. So if you generate done()
43 /// for a callback that takes a value, ensure you also implement convert()!
44 macro_rules! task_done {
45 (value) => {
46 fn done(&self) -> Result<(), nsresult> {
47 // If TaskRunnable.run() calls Task.done() to return a result
48 // on the main thread before TaskRunnable.run() returns on the database
49 // thread, then the Task will get dropped on the database thread.
50 //
51 // But the callback is an nsXPCWrappedJS that isn't safe to release
52 // on the database thread. So we move it out of the Task here to ensure
53 // it gets released on the main thread.
54 let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
55 let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;
56
57 match self.result.swap(None) {
58 Some(Ok(value)) => unsafe { callback.Resolve(self.convert(value)?.coerce()) },
59 Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
60 None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
61 }
62 .to_result()
63 }
64 };
65
66 (void) => {
67 fn done(&self) -> Result<(), nsresult> {
68 // If TaskRunnable.run() calls Task.done() to return a result
69 // on the main thread before TaskRunnable.run() returns on the database
70 // thread, then the Task will get dropped on the database thread.
71 //
72 // But the callback is an nsXPCWrappedJS that isn't safe to release
73 // on the database thread. So we move it out of the Task here to ensure
74 // it gets released on the main thread.
75 let threadbound = self.callback.swap(None).ok_or(NS_ERROR_FAILURE)?;
76 let callback = threadbound.get_ref().ok_or(NS_ERROR_FAILURE)?;
77
78 match self.result.swap(None) {
79 Some(Ok(())) => unsafe { callback.Resolve() },
80 Some(Err(err)) => unsafe { callback.Reject(&*nsCString::from(err.to_string())) },
81 None => unsafe { callback.Reject(&*nsCString::from("unexpected")) },
82 }
83 .to_result()
84 }
85 };
86 }
87
88 /// A tuple comprising an Arc<RwLock<Rkv>> and a SingleStore, which is
89 /// the result of GetOrCreateTask. We declare this type because otherwise
90 /// Clippy complains "error: very complex type used. Consider factoring
91 /// parts into `type` definitions" (i.e. clippy::type-complexity) when we
92 /// declare the type of `GetOrCreateTask::result`.
93 type RkvStoreTuple = (Arc<RwLock<Rkv>>, SingleStore);
94
95 // The threshold for active resizing.
96 const RESIZE_RATIO: f32 = 0.85;
97
98 /// The threshold (50 MB) to switch the resizing policy from the double size to
99 /// the constant increment for active resizing.
100 const INCREMENTAL_RESIZE_THRESHOLD: usize = 52_428_800;
101
102 /// The incremental resize step (5 MB)
103 const INCREMENTAL_RESIZE_STEP: usize = 5_242_880;
104
105 /// The RKV disk page size and mask.
106 const PAGE_SIZE: usize = 4096;
107 const PAGE_SIZE_MASK: usize = 0b_1111_1111_1111;
108
109 /// Round the non-zero size to the multiple of page size greater or equal.
110 ///
111 /// It does not handle the special cases such as size zero and overflow,
112 /// because even if that happens (extremely unlikely though), RKV will
113 /// ignore the new size if it's smaller than the current size.
114 ///
115 /// E.g:
116 /// [ 1 - 4096] -> 4096,
117 /// [4097 - 8192] -> 8192,
118 /// [8193 - 12286] -> 12286,
round_to_pagesize(size: usize) -> usize119 fn round_to_pagesize(size: usize) -> usize {
120 if size & PAGE_SIZE_MASK == 0 {
121 size
122 } else {
123 (size & !PAGE_SIZE_MASK) + PAGE_SIZE
124 }
125 }
126
127 /// Kvstore employes two auto resizing strategies: active and passive resizing.
128 /// They work together to liberate consumers from having to guess the "proper"
129 /// size of the store upfront. See more detail about this in Bug 1543861.
130 ///
131 /// Active resizing that is performed at the store startup.
132 ///
133 /// It either increases the size in double, or by a constant size if its size
134 /// reaches INCREMENTAL_RESIZE_THRESHOLD.
135 ///
136 /// Note that on Linux / MAC OSX, the increased size would only take effect if
137 /// there is a write transaction committed afterwards.
active_resize(env: &Rkv) -> Result<(), StoreError>138 fn active_resize(env: &Rkv) -> Result<(), StoreError> {
139 let info = env.info()?;
140 let current_size = info.map_size();
141
142 let size = if current_size < INCREMENTAL_RESIZE_THRESHOLD {
143 current_size << 1
144 } else {
145 current_size + INCREMENTAL_RESIZE_STEP
146 };
147
148 env.set_map_size(size)?;
149 Ok(())
150 }
151
152 /// Passive resizing that is performed when the MAP_FULL error occurs. It
153 /// increases the store with a `wanted` size.
154 ///
155 /// Note that the `wanted` size must be rounded to a multiple of page size
156 /// by using `round_to_pagesize`.
passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError>157 fn passive_resize(env: &Rkv, wanted: usize) -> Result<(), StoreError> {
158 let info = env.info()?;
159 let current_size = info.map_size();
160 env.set_map_size(current_size + wanted)?;
161 Ok(())
162 }
163
164 pub struct GetOrCreateTask {
165 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueDatabaseCallback>>>,
166 path: nsCString,
167 name: nsCString,
168 result: AtomicCell<Option<Result<RkvStoreTuple, KeyValueError>>>,
169 }
170
171 impl GetOrCreateTask {
new( callback: RefPtr<nsIKeyValueDatabaseCallback>, path: nsCString, name: nsCString, ) -> GetOrCreateTask172 pub fn new(
173 callback: RefPtr<nsIKeyValueDatabaseCallback>,
174 path: nsCString,
175 name: nsCString,
176 ) -> GetOrCreateTask {
177 GetOrCreateTask {
178 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
179 path,
180 name,
181 result: AtomicCell::default(),
182 }
183 }
184
convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError>185 fn convert(&self, result: RkvStoreTuple) -> Result<RefPtr<KeyValueDatabase>, KeyValueError> {
186 Ok(KeyValueDatabase::new(result.0, result.1)?)
187 }
188 }
189
190 impl Task for GetOrCreateTask {
run(&self)191 fn run(&self) {
192 // We do the work within a closure that returns a Result so we can
193 // use the ? operator to simplify the implementation.
194 self.result
195 .store(Some(|| -> Result<RkvStoreTuple, KeyValueError> {
196 let store;
197 let mut manager = Manager::singleton().write()?;
198 // Note that path canonicalization is diabled to work around crashes on Fennec:
199 // https://bugzilla.mozilla.org/show_bug.cgi?id=1531887
200 let path = Path::new(str::from_utf8(&self.path)?);
201 let rkv = manager.get_or_create(path, Rkv::new::<SafeMode>)?;
202 Migrator::easy_migrate_lmdb_to_safe_mode(path, rkv.read()?)?;
203 {
204 let env = rkv.read()?;
205 let load_ratio = env.load_ratio()?.unwrap_or(0.0);
206 if load_ratio > RESIZE_RATIO {
207 active_resize(&env)?;
208 }
209 store = env.open_single(str::from_utf8(&self.name)?, StoreOptions::create())?;
210 }
211 Ok((rkv, store))
212 }()));
213 }
214
215 task_done!(value);
216 }
217
218 pub struct PutTask {
219 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
220 rkv: Arc<RwLock<Rkv>>,
221 store: SingleStore,
222 key: nsCString,
223 value: OwnedValue,
224 result: AtomicCell<Option<Result<(), KeyValueError>>>,
225 }
226
227 impl PutTask {
new( callback: RefPtr<nsIKeyValueVoidCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, key: nsCString, value: OwnedValue, ) -> PutTask228 pub fn new(
229 callback: RefPtr<nsIKeyValueVoidCallback>,
230 rkv: Arc<RwLock<Rkv>>,
231 store: SingleStore,
232 key: nsCString,
233 value: OwnedValue,
234 ) -> PutTask {
235 PutTask {
236 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
237 rkv,
238 store,
239 key,
240 value,
241 result: AtomicCell::default(),
242 }
243 }
244 }
245
246 impl Task for PutTask {
run(&self)247 fn run(&self) {
248 // We do the work within a closure that returns a Result so we can
249 // use the ? operator to simplify the implementation.
250 self.result.store(Some(|| -> Result<(), KeyValueError> {
251 let env = self.rkv.read()?;
252 let key = str::from_utf8(&self.key)?;
253 let v = Value::from(&self.value);
254 let mut resized = false;
255
256 // Use a loop here in case we want to retry from a recoverable
257 // error such as `StoreError::MapFull`.
258 loop {
259 let mut writer = env.write()?;
260
261 match self.store.put(&mut writer, key, &v) {
262 Ok(_) => (),
263
264 // Only handle the first MapFull error via passive resizing.
265 // Propogate the subsequent MapFull error.
266 Err(StoreError::MapFull) if !resized => {
267 // abort the failed transaction for resizing.
268 writer.abort();
269
270 // calculate the size of pairs and resize the store accordingly.
271 let pair_size =
272 key.len() + v.serialized_size().map_err(StoreError::from)? as usize;
273 let wanted = round_to_pagesize(pair_size);
274 passive_resize(&env, wanted)?;
275 resized = true;
276 continue;
277 }
278
279 Err(err) => return Err(KeyValueError::StoreError(err)),
280 }
281
282 writer.commit()?;
283 break;
284 }
285
286 Ok(())
287 }()));
288 }
289
290 task_done!(void);
291 }
292
293 pub struct WriteManyTask {
294 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
295 rkv: Arc<RwLock<Rkv>>,
296 store: SingleStore,
297 pairs: Vec<(nsCString, Option<OwnedValue>)>,
298 result: AtomicCell<Option<Result<(), KeyValueError>>>,
299 }
300
301 impl WriteManyTask {
new( callback: RefPtr<nsIKeyValueVoidCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, pairs: Vec<(nsCString, Option<OwnedValue>)>, ) -> WriteManyTask302 pub fn new(
303 callback: RefPtr<nsIKeyValueVoidCallback>,
304 rkv: Arc<RwLock<Rkv>>,
305 store: SingleStore,
306 pairs: Vec<(nsCString, Option<OwnedValue>)>,
307 ) -> WriteManyTask {
308 WriteManyTask {
309 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
310 rkv,
311 store,
312 pairs,
313 result: AtomicCell::default(),
314 }
315 }
316
calc_pair_size(&self) -> Result<usize, StoreError>317 fn calc_pair_size(&self) -> Result<usize, StoreError> {
318 let mut total = 0;
319
320 for (key, value) in self.pairs.iter() {
321 if let Some(val) = value {
322 total += key.len();
323 total += Value::from(val)
324 .serialized_size()
325 .map_err(StoreError::from)? as usize;
326 }
327 }
328
329 Ok(total)
330 }
331 }
332
333 impl Task for WriteManyTask {
run(&self)334 fn run(&self) {
335 // We do the work within a closure that returns a Result so we can
336 // use the ? operator to simplify the implementation.
337 self.result.store(Some(|| -> Result<(), KeyValueError> {
338 let env = self.rkv.read()?;
339 let mut resized = false;
340
341 // Use a loop here in case we want to retry from a recoverable
342 // error such as `StoreError::MapFull`.
343 'outer: loop {
344 let mut writer = env.write()?;
345
346 for (key, value) in self.pairs.iter() {
347 let key = str::from_utf8(key)?;
348 match value {
349 // To put.
350 Some(val) => {
351 match self.store.put(&mut writer, key, &Value::from(val)) {
352 Ok(_) => (),
353
354 // Only handle the first MapFull error via passive resizing.
355 // Propogate the subsequent MapFull error.
356 Err(StoreError::MapFull) if !resized => {
357 // Abort the failed transaction for resizing.
358 writer.abort();
359
360 // Calculate the size of pairs and resize accordingly.
361 let pair_size = self.calc_pair_size()?;
362 let wanted = round_to_pagesize(pair_size);
363 passive_resize(&env, wanted)?;
364 resized = true;
365 continue 'outer;
366 }
367
368 Err(err) => return Err(KeyValueError::StoreError(err)),
369 }
370 }
371 // To delete.
372 None => {
373 match self.store.delete(&mut writer, key) {
374 Ok(_) => (),
375
376 // RKV fails with an error if the key to delete wasn't found,
377 // and Rkv returns that error, but we ignore it, as we expect most
378 // of our consumers to want this behavior.
379 Err(StoreError::KeyValuePairNotFound) => (),
380
381 Err(err) => return Err(KeyValueError::StoreError(err)),
382 };
383 }
384 }
385 }
386
387 writer.commit()?;
388 break; // 'outer: loop
389 }
390
391 Ok(())
392 }()));
393 }
394
395 task_done!(void);
396 }
397
398 pub struct GetTask {
399 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
400 rkv: Arc<RwLock<Rkv>>,
401 store: SingleStore,
402 key: nsCString,
403 default_value: Option<OwnedValue>,
404 result: AtomicCell<Option<Result<Option<OwnedValue>, KeyValueError>>>,
405 }
406
407 impl GetTask {
new( callback: RefPtr<nsIKeyValueVariantCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, key: nsCString, default_value: Option<OwnedValue>, ) -> GetTask408 pub fn new(
409 callback: RefPtr<nsIKeyValueVariantCallback>,
410 rkv: Arc<RwLock<Rkv>>,
411 store: SingleStore,
412 key: nsCString,
413 default_value: Option<OwnedValue>,
414 ) -> GetTask {
415 GetTask {
416 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
417 rkv,
418 store,
419 key,
420 default_value,
421 result: AtomicCell::default(),
422 }
423 }
424
convert(&self, result: Option<OwnedValue>) -> Result<RefPtr<nsIVariant>, KeyValueError>425 fn convert(&self, result: Option<OwnedValue>) -> Result<RefPtr<nsIVariant>, KeyValueError> {
426 Ok(match result {
427 Some(val) => owned_to_variant(val)?,
428 None => ().into_variant(),
429 })
430 }
431 }
432
433 impl Task for GetTask {
run(&self)434 fn run(&self) {
435 // We do the work within a closure that returns a Result so we can
436 // use the ? operator to simplify the implementation.
437 self.result
438 .store(Some(|| -> Result<Option<OwnedValue>, KeyValueError> {
439 let key = str::from_utf8(&self.key)?;
440 let env = self.rkv.read()?;
441 let reader = env.read()?;
442 let value = self.store.get(&reader, key)?;
443
444 Ok(match value {
445 Some(value) => Some(OwnedValue::from(&value)),
446 None => match self.default_value {
447 Some(ref val) => Some(val.clone()),
448 None => None,
449 },
450 })
451 }()));
452 }
453
454 task_done!(value);
455 }
456
457 pub struct HasTask {
458 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVariantCallback>>>,
459 rkv: Arc<RwLock<Rkv>>,
460 store: SingleStore,
461 key: nsCString,
462 result: AtomicCell<Option<Result<bool, KeyValueError>>>,
463 }
464
465 impl HasTask {
new( callback: RefPtr<nsIKeyValueVariantCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, key: nsCString, ) -> HasTask466 pub fn new(
467 callback: RefPtr<nsIKeyValueVariantCallback>,
468 rkv: Arc<RwLock<Rkv>>,
469 store: SingleStore,
470 key: nsCString,
471 ) -> HasTask {
472 HasTask {
473 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
474 rkv,
475 store,
476 key,
477 result: AtomicCell::default(),
478 }
479 }
480
convert(&self, result: bool) -> Result<RefPtr<nsIVariant>, KeyValueError>481 fn convert(&self, result: bool) -> Result<RefPtr<nsIVariant>, KeyValueError> {
482 Ok(result.into_variant())
483 }
484 }
485
486 impl Task for HasTask {
run(&self)487 fn run(&self) {
488 // We do the work within a closure that returns a Result so we can
489 // use the ? operator to simplify the implementation.
490 self.result.store(Some(|| -> Result<bool, KeyValueError> {
491 let key = str::from_utf8(&self.key)?;
492 let env = self.rkv.read()?;
493 let reader = env.read()?;
494 let value = self.store.get(&reader, key)?;
495 Ok(value.is_some())
496 }()));
497 }
498
499 task_done!(value);
500 }
501
502 pub struct DeleteTask {
503 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
504 rkv: Arc<RwLock<Rkv>>,
505 store: SingleStore,
506 key: nsCString,
507 result: AtomicCell<Option<Result<(), KeyValueError>>>,
508 }
509
510 impl DeleteTask {
new( callback: RefPtr<nsIKeyValueVoidCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, key: nsCString, ) -> DeleteTask511 pub fn new(
512 callback: RefPtr<nsIKeyValueVoidCallback>,
513 rkv: Arc<RwLock<Rkv>>,
514 store: SingleStore,
515 key: nsCString,
516 ) -> DeleteTask {
517 DeleteTask {
518 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
519 rkv,
520 store,
521 key,
522 result: AtomicCell::default(),
523 }
524 }
525 }
526
527 impl Task for DeleteTask {
run(&self)528 fn run(&self) {
529 // We do the work within a closure that returns a Result so we can
530 // use the ? operator to simplify the implementation.
531 self.result.store(Some(|| -> Result<(), KeyValueError> {
532 let key = str::from_utf8(&self.key)?;
533 let env = self.rkv.read()?;
534 let mut writer = env.write()?;
535
536 match self.store.delete(&mut writer, key) {
537 Ok(_) => (),
538
539 // RKV fails with an error if the key to delete wasn't found,
540 // and Rkv returns that error, but we ignore it, as we expect most
541 // of our consumers to want this behavior.
542 Err(StoreError::KeyValuePairNotFound) => (),
543
544 Err(err) => return Err(KeyValueError::StoreError(err)),
545 };
546
547 writer.commit()?;
548
549 Ok(())
550 }()));
551 }
552
553 task_done!(void);
554 }
555
556 pub struct ClearTask {
557 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueVoidCallback>>>,
558 rkv: Arc<RwLock<Rkv>>,
559 store: SingleStore,
560 result: AtomicCell<Option<Result<(), KeyValueError>>>,
561 }
562
563 impl ClearTask {
new( callback: RefPtr<nsIKeyValueVoidCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, ) -> ClearTask564 pub fn new(
565 callback: RefPtr<nsIKeyValueVoidCallback>,
566 rkv: Arc<RwLock<Rkv>>,
567 store: SingleStore,
568 ) -> ClearTask {
569 ClearTask {
570 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
571 rkv,
572 store,
573 result: AtomicCell::default(),
574 }
575 }
576 }
577
578 impl Task for ClearTask {
run(&self)579 fn run(&self) {
580 // We do the work within a closure that returns a Result so we can
581 // use the ? operator to simplify the implementation.
582 self.result.store(Some(|| -> Result<(), KeyValueError> {
583 let env = self.rkv.read()?;
584 let mut writer = env.write()?;
585 self.store.clear(&mut writer)?;
586 writer.commit()?;
587
588 Ok(())
589 }()));
590 }
591
592 task_done!(void);
593 }
594
595 pub struct EnumerateTask {
596 callback: AtomicCell<Option<ThreadBoundRefPtr<nsIKeyValueEnumeratorCallback>>>,
597 rkv: Arc<RwLock<Rkv>>,
598 store: SingleStore,
599 from_key: nsCString,
600 to_key: nsCString,
601 result: AtomicCell<Option<Result<Vec<KeyValuePairResult>, KeyValueError>>>,
602 }
603
604 impl EnumerateTask {
new( callback: RefPtr<nsIKeyValueEnumeratorCallback>, rkv: Arc<RwLock<Rkv>>, store: SingleStore, from_key: nsCString, to_key: nsCString, ) -> EnumerateTask605 pub fn new(
606 callback: RefPtr<nsIKeyValueEnumeratorCallback>,
607 rkv: Arc<RwLock<Rkv>>,
608 store: SingleStore,
609 from_key: nsCString,
610 to_key: nsCString,
611 ) -> EnumerateTask {
612 EnumerateTask {
613 callback: AtomicCell::new(Some(ThreadBoundRefPtr::new(callback))),
614 rkv,
615 store,
616 from_key,
617 to_key,
618 result: AtomicCell::default(),
619 }
620 }
621
convert( &self, result: Vec<KeyValuePairResult>, ) -> Result<RefPtr<KeyValueEnumerator>, KeyValueError>622 fn convert(
623 &self,
624 result: Vec<KeyValuePairResult>,
625 ) -> Result<RefPtr<KeyValueEnumerator>, KeyValueError> {
626 Ok(KeyValueEnumerator::new(result))
627 }
628 }
629
630 impl Task for EnumerateTask {
run(&self)631 fn run(&self) {
632 // We do the work within a closure that returns a Result so we can
633 // use the ? operator to simplify the implementation.
634 self.result.store(Some(
635 || -> Result<Vec<KeyValuePairResult>, KeyValueError> {
636 let env = self.rkv.read()?;
637 let reader = env.read()?;
638 let from_key = str::from_utf8(&self.from_key)?;
639 let to_key = str::from_utf8(&self.to_key)?;
640
641 let iterator = if from_key.is_empty() {
642 self.store.iter_start(&reader)?
643 } else {
644 self.store.iter_from(&reader, &from_key)?
645 };
646
647 // Ideally, we'd enumerate pairs lazily, as the consumer calls
648 // nsIKeyValueEnumerator.getNext(), which calls our
649 // KeyValueEnumerator.get_next() implementation. But KeyValueEnumerator
650 // can't reference the Iter because Rust "cannot #[derive(xpcom)]
651 // on a generic type," and the Iter requires a lifetime parameter,
652 // which would make KeyValueEnumerator generic.
653 //
654 // Our fallback approach is to eagerly collect the iterator
655 // into a collection that KeyValueEnumerator owns. Fixing this so we
656 // enumerate pairs lazily is bug 1499252.
657 let pairs: Vec<KeyValuePairResult> = iterator
658 // Convert the key to a string so we can compare it to the "to" key.
659 // For forward compatibility, we don't fail here if we can't convert
660 // a key to UTF-8. Instead, we store the Err in the collection
661 // and fail lazily in KeyValueEnumerator.get_next().
662 .map(|result| match result {
663 Ok((key, val)) => Ok((str::from_utf8(&key), val)),
664 Err(err) => Err(err),
665 })
666 // Stop iterating once we reach the to_key, if any.
667 .take_while(|result| match result {
668 Ok((key, _val)) => {
669 if to_key.is_empty() {
670 true
671 } else {
672 match *key {
673 Ok(key) => key < to_key,
674 Err(_err) => true,
675 }
676 }
677 }
678 Err(_) => true,
679 })
680 // Convert the key/value pair to owned.
681 .map(|result| match result {
682 Ok((key, val)) => match (key, val) {
683 (Ok(key), val) => Ok((key.to_owned(), OwnedValue::from(&val))),
684 (Err(err), _) => Err(err.into()),
685 },
686 Err(err) => Err(KeyValueError::StoreError(err)),
687 })
688 .collect();
689
690 Ok(pairs)
691 }(),
692 ));
693 }
694
695 task_done!(value);
696 }
697