1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include <algorithm>
19 #include <iterator>
20 #include <map>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "arrow/buffer.h"
28 #include "arrow/buffer_builder.h"
29 #include "arrow/filesystem/mockfs.h"
30 #include "arrow/filesystem/path_util.h"
31 #include "arrow/filesystem/util_internal.h"
32 #include "arrow/io/interfaces.h"
33 #include "arrow/io/memory.h"
34 #include "arrow/util/async_generator.h"
35 #include "arrow/util/future.h"
36 #include "arrow/util/logging.h"
37 #include "arrow/util/string_view.h"
38 #include "arrow/util/variant.h"
39 #include "arrow/util/windows_fixup.h"
40
41 namespace arrow {
42 namespace fs {
43 namespace internal {
44
45 namespace {
46
47 ////////////////////////////////////////////////////////////////////////////
48 // Filesystem structure
49
50 class Entry;
51
52 struct File {
53 TimePoint mtime;
54 std::string name;
55 std::shared_ptr<Buffer> data;
56 std::shared_ptr<const KeyValueMetadata> metadata;
57
Filearrow::fs::internal::__anon5bf181920111::File58 File(TimePoint mtime, std::string name) : mtime(mtime), name(std::move(name)) {}
59
sizearrow::fs::internal::__anon5bf181920111::File60 int64_t size() const { return data ? data->size() : 0; }
61
operator util::string_viewarrow::fs::internal::__anon5bf181920111::File62 explicit operator util::string_view() const {
63 if (data) {
64 return util::string_view(*data);
65 } else {
66 return "";
67 }
68 }
69 };
70
71 struct Directory {
72 std::string name;
73 TimePoint mtime;
74 std::map<std::string, std::unique_ptr<Entry>> entries;
75
Directoryarrow::fs::internal::__anon5bf181920111::Directory76 Directory(std::string name, TimePoint mtime) : name(std::move(name)), mtime(mtime) {}
Directoryarrow::fs::internal::__anon5bf181920111::Directory77 Directory(Directory&& other) noexcept
78 : name(std::move(other.name)),
79 mtime(other.mtime),
80 entries(std::move(other.entries)) {}
81
operator =arrow::fs::internal::__anon5bf181920111::Directory82 Directory& operator=(Directory&& other) noexcept {
83 name = std::move(other.name);
84 mtime = other.mtime;
85 entries = std::move(other.entries);
86 return *this;
87 }
88
Findarrow::fs::internal::__anon5bf181920111::Directory89 Entry* Find(const std::string& s) {
90 auto it = entries.find(s);
91 if (it != entries.end()) {
92 return it->second.get();
93 } else {
94 return nullptr;
95 }
96 }
97
CreateEntryarrow::fs::internal::__anon5bf181920111::Directory98 bool CreateEntry(const std::string& s, std::unique_ptr<Entry> entry) {
99 DCHECK(!s.empty());
100 auto p = entries.emplace(s, std::move(entry));
101 return p.second;
102 }
103
AssignEntryarrow::fs::internal::__anon5bf181920111::Directory104 void AssignEntry(const std::string& s, std::unique_ptr<Entry> entry) {
105 DCHECK(!s.empty());
106 entries[s] = std::move(entry);
107 }
108
DeleteEntryarrow::fs::internal::__anon5bf181920111::Directory109 bool DeleteEntry(const std::string& s) { return entries.erase(s) > 0; }
110
111 private:
112 ARROW_DISALLOW_COPY_AND_ASSIGN(Directory);
113 };
114
115 // A filesystem entry
116 using EntryBase = util::Variant<std::nullptr_t, File, Directory>;
117
118 class Entry : public EntryBase {
119 public:
120 Entry(Entry&&) = default;
121 Entry& operator=(Entry&&) = default;
Entry(Directory && v)122 explicit Entry(Directory&& v) : EntryBase(std::move(v)) {}
Entry(File && v)123 explicit Entry(File&& v) : EntryBase(std::move(v)) {}
124
is_dir() const125 bool is_dir() const { return util::holds_alternative<Directory>(*this); }
126
is_file() const127 bool is_file() const { return util::holds_alternative<File>(*this); }
128
as_dir()129 Directory& as_dir() { return util::get<Directory>(*this); }
130
as_file()131 File& as_file() { return util::get<File>(*this); }
132
133 // Get info for this entry. Note the path() property isn't set.
GetInfo()134 FileInfo GetInfo() {
135 FileInfo info;
136 if (is_dir()) {
137 Directory& dir = as_dir();
138 info.set_type(FileType::Directory);
139 info.set_mtime(dir.mtime);
140 } else {
141 DCHECK(is_file());
142 File& file = as_file();
143 info.set_type(FileType::File);
144 info.set_mtime(file.mtime);
145 info.set_size(file.size());
146 }
147 return info;
148 }
149
150 // Get info for this entry, knowing the parent path.
GetInfo(const std::string & base_path)151 FileInfo GetInfo(const std::string& base_path) {
152 FileInfo info;
153 if (is_dir()) {
154 Directory& dir = as_dir();
155 info.set_type(FileType::Directory);
156 info.set_mtime(dir.mtime);
157 info.set_path(ConcatAbstractPath(base_path, dir.name));
158 } else {
159 DCHECK(is_file());
160 File& file = as_file();
161 info.set_type(FileType::File);
162 info.set_mtime(file.mtime);
163 info.set_size(file.size());
164 info.set_path(ConcatAbstractPath(base_path, file.name));
165 }
166 return info;
167 }
168
169 // Set the entry name
SetName(const std::string & name)170 void SetName(const std::string& name) {
171 if (is_dir()) {
172 as_dir().name = name;
173 } else {
174 DCHECK(is_file());
175 as_file().name = name;
176 }
177 }
178
179 private:
180 ARROW_DISALLOW_COPY_AND_ASSIGN(Entry);
181 };
182
183 ////////////////////////////////////////////////////////////////////////////
184 // Streams
185
186 class MockFSOutputStream : public io::OutputStream {
187 public:
MockFSOutputStream(File * file,MemoryPool * pool)188 MockFSOutputStream(File* file, MemoryPool* pool)
189 : file_(file), builder_(pool), closed_(false) {}
190
191 ~MockFSOutputStream() override = default;
192
193 // Implement the OutputStream interface
Close()194 Status Close() override {
195 if (!closed_) {
196 RETURN_NOT_OK(builder_.Finish(&file_->data));
197 closed_ = true;
198 }
199 return Status::OK();
200 }
201
Abort()202 Status Abort() override {
203 if (!closed_) {
204 // MockFSOutputStream is mainly used for debugging and testing, so
205 // mark an aborted file's contents explicitly.
206 std::stringstream ss;
207 ss << "MockFSOutputStream aborted after " << file_->size() << " bytes written";
208 file_->data = Buffer::FromString(ss.str());
209 closed_ = true;
210 }
211 return Status::OK();
212 }
213
closed() const214 bool closed() const override { return closed_; }
215
Tell() const216 Result<int64_t> Tell() const override {
217 if (closed_) {
218 return Status::Invalid("Invalid operation on closed stream");
219 }
220 return builder_.length();
221 }
222
Write(const void * data,int64_t nbytes)223 Status Write(const void* data, int64_t nbytes) override {
224 if (closed_) {
225 return Status::Invalid("Invalid operation on closed stream");
226 }
227 return builder_.Append(data, nbytes);
228 }
229
230 protected:
231 File* file_;
232 BufferBuilder builder_;
233 bool closed_;
234 };
235
236 class MockFSInputStream : public io::BufferReader {
237 public:
MockFSInputStream(const File & file)238 explicit MockFSInputStream(const File& file)
239 : io::BufferReader(file.data), metadata_(file.metadata) {}
240
ReadMetadata()241 Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
242 return metadata_;
243 }
244
245 protected:
246 std::shared_ptr<const KeyValueMetadata> metadata_;
247 };
248
249 } // namespace
250
operator <<(std::ostream & os,const MockDirInfo & di)251 std::ostream& operator<<(std::ostream& os, const MockDirInfo& di) {
252 return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count()
253 << "]";
254 }
255
operator <<(std::ostream & os,const MockFileInfo & di)256 std::ostream& operator<<(std::ostream& os, const MockFileInfo& di) {
257 return os << "'" << di.full_path << "' [mtime=" << di.mtime.time_since_epoch().count()
258 << ", size=" << di.data.length() << "]";
259 }
260
261 ////////////////////////////////////////////////////////////////////////////
262 // MockFileSystem implementation
263
264 class MockFileSystem::Impl {
265 public:
266 TimePoint current_time;
267 MemoryPool* pool;
268
269 // The root directory
270 Entry root;
271 std::mutex mutex;
272
Impl(TimePoint current_time,MemoryPool * pool)273 Impl(TimePoint current_time, MemoryPool* pool)
274 : current_time(current_time), pool(pool), root(Directory("", current_time)) {}
275
lock_guard()276 std::unique_lock<std::mutex> lock_guard() {
277 return std::unique_lock<std::mutex>(mutex);
278 }
279
RootDir()280 Directory& RootDir() { return root.as_dir(); }
281
282 template <typename It>
FindEntry(It it,It end,size_t * nconsumed)283 Entry* FindEntry(It it, It end, size_t* nconsumed) {
284 size_t consumed = 0;
285 Entry* entry = &root;
286
287 for (; it != end; ++it) {
288 const std::string& part = *it;
289 DCHECK(entry->is_dir());
290 Entry* child = entry->as_dir().Find(part);
291 if (child == nullptr) {
292 // Partial find only
293 break;
294 }
295 ++consumed;
296 entry = child;
297 if (entry->is_file()) {
298 // Cannot go any further
299 break;
300 }
301 // Recurse
302 }
303 *nconsumed = consumed;
304 return entry;
305 }
306
307 // Find an entry, allowing partial matching
FindEntry(const std::vector<std::string> & parts,size_t * nconsumed)308 Entry* FindEntry(const std::vector<std::string>& parts, size_t* nconsumed) {
309 return FindEntry(parts.begin(), parts.end(), nconsumed);
310 }
311
312 // Find an entry, only full matching allowed
FindEntry(const std::vector<std::string> & parts)313 Entry* FindEntry(const std::vector<std::string>& parts) {
314 size_t consumed;
315 auto entry = FindEntry(parts, &consumed);
316 return (consumed == parts.size()) ? entry : nullptr;
317 }
318
319 // Find the parent entry, only full matching allowed
FindParent(const std::vector<std::string> & parts)320 Entry* FindParent(const std::vector<std::string>& parts) {
321 if (parts.size() == 0) {
322 return nullptr;
323 }
324 size_t consumed;
325 auto entry = FindEntry(parts.begin(), --parts.end(), &consumed);
326 return (consumed == parts.size() - 1) ? entry : nullptr;
327 }
328
GatherInfos(const FileSelector & select,const std::string & base_path,const Directory & base_dir,int32_t nesting_depth,std::vector<FileInfo> * infos)329 void GatherInfos(const FileSelector& select, const std::string& base_path,
330 const Directory& base_dir, int32_t nesting_depth,
331 std::vector<FileInfo>* infos) {
332 for (const auto& pair : base_dir.entries) {
333 Entry* child = pair.second.get();
334 infos->push_back(child->GetInfo(base_path));
335 if (select.recursive && nesting_depth < select.max_recursion && child->is_dir()) {
336 Directory& child_dir = child->as_dir();
337 std::string child_path = infos->back().path();
338 GatherInfos(select, std::move(child_path), child_dir, nesting_depth + 1, infos);
339 }
340 }
341 }
342
DumpDirs(const std::string & prefix,const Directory & dir,std::vector<MockDirInfo> * out)343 void DumpDirs(const std::string& prefix, const Directory& dir,
344 std::vector<MockDirInfo>* out) {
345 std::string path = prefix + dir.name;
346 if (!path.empty()) {
347 out->push_back({path, dir.mtime});
348 path += "/";
349 }
350 for (const auto& pair : dir.entries) {
351 Entry* child = pair.second.get();
352 if (child->is_dir()) {
353 DumpDirs(path, child->as_dir(), out);
354 }
355 }
356 }
357
DumpFiles(const std::string & prefix,const Directory & dir,std::vector<MockFileInfo> * out)358 void DumpFiles(const std::string& prefix, const Directory& dir,
359 std::vector<MockFileInfo>* out) {
360 std::string path = prefix + dir.name;
361 if (!path.empty()) {
362 path += "/";
363 }
364 for (const auto& pair : dir.entries) {
365 Entry* child = pair.second.get();
366 if (child->is_file()) {
367 auto& file = child->as_file();
368 out->push_back({path + file.name, file.mtime, util::string_view(file)});
369 } else if (child->is_dir()) {
370 DumpFiles(path, child->as_dir(), out);
371 }
372 }
373 }
374
OpenOutputStream(const std::string & path,bool append,const std::shared_ptr<const KeyValueMetadata> & metadata)375 Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
376 const std::string& path, bool append,
377 const std::shared_ptr<const KeyValueMetadata>& metadata) {
378 auto parts = SplitAbstractPath(path);
379 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
380
381 Entry* parent = FindParent(parts);
382 if (parent == nullptr || !parent->is_dir()) {
383 return PathNotFound(path);
384 }
385 // Find the file in the parent dir, or create it
386 const auto& name = parts.back();
387 Entry* child = parent->as_dir().Find(name);
388 File* file;
389 if (child == nullptr) {
390 child = new Entry(File(current_time, name));
391 parent->as_dir().AssignEntry(name, std::unique_ptr<Entry>(child));
392 file = &child->as_file();
393 } else if (child->is_file()) {
394 file = &child->as_file();
395 file->mtime = current_time;
396 } else {
397 return NotAFile(path);
398 }
399 file->metadata = metadata;
400 auto ptr = std::make_shared<MockFSOutputStream>(file, pool);
401 if (append && file->data) {
402 RETURN_NOT_OK(ptr->Write(file->data->data(), file->data->size()));
403 }
404 return ptr;
405 }
406
OpenInputReader(const std::string & path)407 Result<std::shared_ptr<io::BufferReader>> OpenInputReader(const std::string& path) {
408 auto parts = SplitAbstractPath(path);
409 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
410
411 Entry* entry = FindEntry(parts);
412 if (entry == nullptr) {
413 return PathNotFound(path);
414 }
415 if (!entry->is_file()) {
416 return NotAFile(path);
417 }
418 return std::make_shared<MockFSInputStream>(entry->as_file());
419 }
420 };
421
422 MockFileSystem::~MockFileSystem() = default;
423
MockFileSystem(TimePoint current_time,const io::IOContext & io_context)424 MockFileSystem::MockFileSystem(TimePoint current_time, const io::IOContext& io_context) {
425 impl_ = std::unique_ptr<Impl>(new Impl(current_time, io_context.pool()));
426 }
427
Equals(const FileSystem & other) const428 bool MockFileSystem::Equals(const FileSystem& other) const { return this == &other; }
429
CreateDir(const std::string & path,bool recursive)430 Status MockFileSystem::CreateDir(const std::string& path, bool recursive) {
431 auto parts = SplitAbstractPath(path);
432 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
433
434 auto guard = impl_->lock_guard();
435
436 size_t consumed;
437 Entry* entry = impl_->FindEntry(parts, &consumed);
438 if (!entry->is_dir()) {
439 auto file_path = JoinAbstractPath(parts.begin(), parts.begin() + consumed);
440 return Status::IOError("Cannot create directory '", path, "': ", "ancestor '",
441 file_path, "' is not a directory");
442 }
443 if (!recursive && (parts.size() - consumed) > 1) {
444 return Status::IOError("Cannot create directory '", path,
445 "': ", "parent does not exist");
446 }
447 for (size_t i = consumed; i < parts.size(); ++i) {
448 const auto& name = parts[i];
449 std::unique_ptr<Entry> child(new Entry(Directory(name, impl_->current_time)));
450 Entry* child_ptr = child.get();
451 bool inserted = entry->as_dir().CreateEntry(name, std::move(child));
452 // No race condition on insertion is possible, as all operations are locked
453 DCHECK(inserted);
454 entry = child_ptr;
455 }
456 return Status::OK();
457 }
458
DeleteDir(const std::string & path)459 Status MockFileSystem::DeleteDir(const std::string& path) {
460 auto parts = SplitAbstractPath(path);
461 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
462
463 auto guard = impl_->lock_guard();
464
465 Entry* parent = impl_->FindParent(parts);
466 if (parent == nullptr || !parent->is_dir()) {
467 return PathNotFound(path);
468 }
469 Directory& parent_dir = parent->as_dir();
470 auto child = parent_dir.Find(parts.back());
471 if (child == nullptr) {
472 return PathNotFound(path);
473 }
474 if (!child->is_dir()) {
475 return NotADir(path);
476 }
477
478 bool deleted = parent_dir.DeleteEntry(parts.back());
479 DCHECK(deleted);
480 return Status::OK();
481 }
482
DeleteDirContents(const std::string & path)483 Status MockFileSystem::DeleteDirContents(const std::string& path) {
484 auto parts = SplitAbstractPath(path);
485 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
486
487 auto guard = impl_->lock_guard();
488
489 if (parts.empty()) {
490 // Wipe filesystem
491 return internal::InvalidDeleteDirContents(path);
492 }
493
494 Entry* entry = impl_->FindEntry(parts);
495 if (entry == nullptr) {
496 return PathNotFound(path);
497 }
498 if (!entry->is_dir()) {
499 return NotADir(path);
500 }
501 entry->as_dir().entries.clear();
502 return Status::OK();
503 }
504
DeleteRootDirContents()505 Status MockFileSystem::DeleteRootDirContents() {
506 auto guard = impl_->lock_guard();
507
508 impl_->RootDir().entries.clear();
509 return Status::OK();
510 }
511
DeleteFile(const std::string & path)512 Status MockFileSystem::DeleteFile(const std::string& path) {
513 auto parts = SplitAbstractPath(path);
514 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
515
516 auto guard = impl_->lock_guard();
517
518 Entry* parent = impl_->FindParent(parts);
519 if (parent == nullptr || !parent->is_dir()) {
520 return PathNotFound(path);
521 }
522 Directory& parent_dir = parent->as_dir();
523 auto child = parent_dir.Find(parts.back());
524 if (child == nullptr) {
525 return PathNotFound(path);
526 }
527 if (!child->is_file()) {
528 return NotAFile(path);
529 }
530 bool deleted = parent_dir.DeleteEntry(parts.back());
531 DCHECK(deleted);
532 return Status::OK();
533 }
534
GetFileInfo(const std::string & path)535 Result<FileInfo> MockFileSystem::GetFileInfo(const std::string& path) {
536 auto parts = SplitAbstractPath(path);
537 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
538
539 auto guard = impl_->lock_guard();
540
541 FileInfo info;
542 Entry* entry = impl_->FindEntry(parts);
543 if (entry == nullptr) {
544 info.set_type(FileType::NotFound);
545 } else {
546 info = entry->GetInfo();
547 }
548 info.set_path(path);
549 return info;
550 }
551
GetFileInfo(const FileSelector & selector)552 Result<FileInfoVector> MockFileSystem::GetFileInfo(const FileSelector& selector) {
553 auto parts = SplitAbstractPath(selector.base_dir);
554 RETURN_NOT_OK(ValidateAbstractPathParts(parts));
555
556 auto guard = impl_->lock_guard();
557
558 FileInfoVector results;
559
560 Entry* base_dir = impl_->FindEntry(parts);
561 if (base_dir == nullptr) {
562 // Base directory does not exist
563 if (selector.allow_not_found) {
564 return results;
565 } else {
566 return PathNotFound(selector.base_dir);
567 }
568 }
569 if (!base_dir->is_dir()) {
570 return NotADir(selector.base_dir);
571 }
572
573 impl_->GatherInfos(selector, selector.base_dir, base_dir->as_dir(), 0, &results);
574 return results;
575 }
576
577 namespace {
578
579 // Helper for binary operations (move, copy)
580 struct BinaryOp {
581 std::vector<std::string> src_parts;
582 std::vector<std::string> dest_parts;
583 Directory& src_dir;
584 Directory& dest_dir;
585 std::string src_name;
586 std::string dest_name;
587 Entry* src_entry;
588 Entry* dest_entry;
589
590 template <typename OpFunc>
Runarrow::fs::internal::__anon5bf181920211::BinaryOp591 static Status Run(MockFileSystem::Impl* impl, const std::string& src,
592 const std::string& dest, OpFunc&& op_func) {
593 auto src_parts = SplitAbstractPath(src);
594 auto dest_parts = SplitAbstractPath(dest);
595 RETURN_NOT_OK(ValidateAbstractPathParts(src_parts));
596 RETURN_NOT_OK(ValidateAbstractPathParts(dest_parts));
597
598 auto guard = impl->lock_guard();
599
600 // Both source and destination must have valid parents
601 Entry* src_parent = impl->FindParent(src_parts);
602 if (src_parent == nullptr || !src_parent->is_dir()) {
603 return PathNotFound(src);
604 }
605 Entry* dest_parent = impl->FindParent(dest_parts);
606 if (dest_parent == nullptr || !dest_parent->is_dir()) {
607 return PathNotFound(dest);
608 }
609 Directory& src_dir = src_parent->as_dir();
610 Directory& dest_dir = dest_parent->as_dir();
611 DCHECK_GE(src_parts.size(), 1);
612 DCHECK_GE(dest_parts.size(), 1);
613 const auto& src_name = src_parts.back();
614 const auto& dest_name = dest_parts.back();
615
616 BinaryOp op{std::move(src_parts),
617 std::move(dest_parts),
618 src_dir,
619 dest_dir,
620 src_name,
621 dest_name,
622 src_dir.Find(src_name),
623 dest_dir.Find(dest_name)};
624
625 return op_func(std::move(op));
626 }
627 };
628
629 } // namespace
630
Move(const std::string & src,const std::string & dest)631 Status MockFileSystem::Move(const std::string& src, const std::string& dest) {
632 return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status {
633 if (op.src_entry == nullptr) {
634 return PathNotFound(src);
635 }
636 if (op.dest_entry != nullptr) {
637 if (op.dest_entry->is_dir()) {
638 return Status::IOError("Cannot replace destination '", dest,
639 "', which is a directory");
640 }
641 if (op.dest_entry->is_file() && op.src_entry->is_dir()) {
642 return Status::IOError("Cannot replace destination '", dest,
643 "', which is a file, with directory '", src, "'");
644 }
645 }
646 if (op.src_parts.size() < op.dest_parts.size()) {
647 // Check if dest is a child of src
648 auto p =
649 std::mismatch(op.src_parts.begin(), op.src_parts.end(), op.dest_parts.begin());
650 if (p.first == op.src_parts.end()) {
651 return Status::IOError("Cannot move '", src, "' into child path '", dest, "'");
652 }
653 }
654
655 // Move original entry, fix its name
656 std::unique_ptr<Entry> new_entry(new Entry(std::move(*op.src_entry)));
657 new_entry->SetName(op.dest_name);
658 bool deleted = op.src_dir.DeleteEntry(op.src_name);
659 DCHECK(deleted);
660 op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry));
661 return Status::OK();
662 });
663 }
664
CopyFile(const std::string & src,const std::string & dest)665 Status MockFileSystem::CopyFile(const std::string& src, const std::string& dest) {
666 return BinaryOp::Run(impl_.get(), src, dest, [&](const BinaryOp& op) -> Status {
667 if (op.src_entry == nullptr) {
668 return PathNotFound(src);
669 }
670 if (!op.src_entry->is_file()) {
671 return NotAFile(src);
672 }
673 if (op.dest_entry != nullptr && op.dest_entry->is_dir()) {
674 return Status::IOError("Cannot replace destination '", dest,
675 "', which is a directory");
676 }
677
678 // Copy original entry, fix its name
679 std::unique_ptr<Entry> new_entry(new Entry(File(op.src_entry->as_file())));
680 new_entry->SetName(op.dest_name);
681 op.dest_dir.AssignEntry(op.dest_name, std::move(new_entry));
682 return Status::OK();
683 });
684 }
685
OpenInputStream(const std::string & path)686 Result<std::shared_ptr<io::InputStream>> MockFileSystem::OpenInputStream(
687 const std::string& path) {
688 auto guard = impl_->lock_guard();
689
690 return impl_->OpenInputReader(path);
691 }
692
OpenInputFile(const std::string & path)693 Result<std::shared_ptr<io::RandomAccessFile>> MockFileSystem::OpenInputFile(
694 const std::string& path) {
695 auto guard = impl_->lock_guard();
696
697 return impl_->OpenInputReader(path);
698 }
699
OpenOutputStream(const std::string & path,const std::shared_ptr<const KeyValueMetadata> & metadata)700 Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenOutputStream(
701 const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
702 auto guard = impl_->lock_guard();
703
704 return impl_->OpenOutputStream(path, /*append=*/false, metadata);
705 }
706
OpenAppendStream(const std::string & path,const std::shared_ptr<const KeyValueMetadata> & metadata)707 Result<std::shared_ptr<io::OutputStream>> MockFileSystem::OpenAppendStream(
708 const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
709 auto guard = impl_->lock_guard();
710
711 return impl_->OpenOutputStream(path, /*append=*/true, metadata);
712 }
713
AllDirs()714 std::vector<MockDirInfo> MockFileSystem::AllDirs() {
715 auto guard = impl_->lock_guard();
716
717 std::vector<MockDirInfo> result;
718 impl_->DumpDirs("", impl_->RootDir(), &result);
719 return result;
720 }
721
AllFiles()722 std::vector<MockFileInfo> MockFileSystem::AllFiles() {
723 auto guard = impl_->lock_guard();
724
725 std::vector<MockFileInfo> result;
726 impl_->DumpFiles("", impl_->RootDir(), &result);
727 return result;
728 }
729
CreateFile(const std::string & path,util::string_view contents,bool recursive)730 Status MockFileSystem::CreateFile(const std::string& path, util::string_view contents,
731 bool recursive) {
732 auto parent = fs::internal::GetAbstractPathParent(path).first;
733
734 if (parent != "") {
735 RETURN_NOT_OK(CreateDir(parent, recursive));
736 }
737
738 ARROW_ASSIGN_OR_RAISE(auto file, OpenOutputStream(path));
739 RETURN_NOT_OK(file->Write(contents));
740 return file->Close();
741 }
742
Make(TimePoint current_time,const std::vector<FileInfo> & infos)743 Result<std::shared_ptr<FileSystem>> MockFileSystem::Make(
744 TimePoint current_time, const std::vector<FileInfo>& infos) {
745 auto fs = std::make_shared<MockFileSystem>(current_time);
746 for (const auto& info : infos) {
747 switch (info.type()) {
748 case FileType::Directory:
749 RETURN_NOT_OK(fs->CreateDir(info.path(), /*recursive*/ true));
750 break;
751 case FileType::File:
752 RETURN_NOT_OK(fs->CreateFile(info.path(), "", /*recursive*/ true));
753 break;
754 default:
755 break;
756 }
757 }
758
759 return fs;
760 }
761
GetFileInfoGenerator(const FileSelector & select)762 FileInfoGenerator MockAsyncFileSystem::GetFileInfoGenerator(const FileSelector& select) {
763 auto maybe_infos = GetFileInfo(select);
764 if (maybe_infos.ok()) {
765 // Return the FileInfo entries one by one
766 const auto& infos = *maybe_infos;
767 std::vector<FileInfoVector> chunks(infos.size());
768 std::transform(infos.begin(), infos.end(), chunks.begin(),
769 [](const FileInfo& info) { return FileInfoVector{info}; });
770 return MakeVectorGenerator(std::move(chunks));
771 } else {
772 return MakeFailingGenerator(maybe_infos);
773 }
774 }
775
776 } // namespace internal
777 } // namespace fs
778 } // namespace arrow
779