1 /**
2  * @file   posix.cc
3  *
4  * @section LICENSE
5  *
6  * The MIT License
7  *
8  * @copyright Copyright (c) 2017-2021 TileDB, Inc.
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
23  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  *
28  * @section DESCRIPTION
29  *
30  * This file includes definitions of POSIX filesystem functions.
31  */
32 
33 #ifndef _WIN32
34 
35 #include "tiledb/sm/filesystem/posix.h"
36 #include "tiledb/common/logger.h"
37 #include "tiledb/common/thread_pool.h"
38 #include "tiledb/sm/misc/constants.h"
39 #include "tiledb/sm/misc/utils.h"
40 
41 #include <dirent.h>
42 #include <fcntl.h>
43 #include <limits.h>
44 #include <sys/stat.h>
45 #include <unistd.h>
46 
47 #include <fstream>
48 #include <future>
49 #include <iostream>
50 #include <queue>
51 #include <sstream>
52 
53 using namespace tiledb::common;
54 
55 namespace tiledb {
56 namespace sm {
57 
Posix()58 Posix::Posix()
59     : config_(default_config_) {
60 }
61 
both_slashes(char a,char b)62 bool Posix::both_slashes(char a, char b) {
63   return a == '/' && b == '/';
64 }
65 
read_all(int fd,void * buffer,uint64_t nbytes,uint64_t offset)66 uint64_t Posix::read_all(
67     int fd, void* buffer, uint64_t nbytes, uint64_t offset) {
68   auto bytes = reinterpret_cast<char*>(buffer);
69   uint64_t nread = 0;
70   do {
71     ssize_t actual_read =
72         ::pread(fd, bytes + nread, nbytes - nread, offset + nread);
73     if (actual_read == -1) {
74       LOG_STATUS(
75           Status::Error(std::string("POSIX pread error: ") + strerror(errno)));
76       return nread;
77     } else {
78       nread += actual_read;
79     }
80   } while (nread < nbytes);
81 
82   return nread;
83 }
84 
pwrite_all(int fd,uint64_t file_offset,const void * buffer,uint64_t nbytes)85 uint64_t Posix::pwrite_all(
86     int fd, uint64_t file_offset, const void* buffer, uint64_t nbytes) {
87   auto bytes = reinterpret_cast<const char*>(buffer);
88   uint64_t written = 0;
89   do {
90     ssize_t actual_written =
91         ::pwrite(fd, bytes + written, nbytes - written, file_offset + written);
92     if (actual_written == -1) {
93       LOG_STATUS(
94           Status::Error(std::string("POSIX write error: ") + strerror(errno)));
95       return written;
96     } else {
97       written += actual_written;
98     }
99   } while (written < nbytes);
100 
101   return written;
102 }
103 
adjacent_slashes_dedup(std::string * path)104 void Posix::adjacent_slashes_dedup(std::string* path) {
105   assert(utils::parse::starts_with(*path, "file://"));
106   path->erase(
107       std::unique(
108           path->begin() + std::string("file://").size(),
109           path->end(),
110           both_slashes),
111       path->end());
112 }
113 
abs_path(const std::string & path)114 std::string Posix::abs_path(const std::string& path) {
115   std::string resolved_path = abs_path_internal(path);
116 
117   // Ensure the returned has the same postfix slash as 'path'.
118   if (utils::parse::ends_with(path, "/")) {
119     if (!utils::parse::ends_with(resolved_path, "/")) {
120       resolved_path = resolved_path + "/";
121     }
122   } else {
123     if (utils::parse::ends_with(resolved_path, "/")) {
124       resolved_path = resolved_path.substr(0, resolved_path.length() - 1);
125     }
126   }
127 
128   return resolved_path;
129 }
130 
abs_path_internal(const std::string & path)131 std::string Posix::abs_path_internal(const std::string& path) {
132   // Initialize current, home and root
133   std::string current = current_dir();
134   auto env_home_ptr = getenv("HOME");
135   std::string home = env_home_ptr != nullptr ? env_home_ptr : current;
136   std::string root = "/";
137   std::string posix_prefix = "file://";
138 
139   // Easy cases
140   if (path.empty() || path == "." || path == "./")
141     return posix_prefix + current;
142   if (path == "~")
143     return posix_prefix + home;
144   if (path == "/")
145     return posix_prefix + root;
146 
147   // Other cases
148   std::string ret_dir;
149   if (utils::parse::starts_with(path, posix_prefix))
150     return path;
151   else if (utils::parse::starts_with(path, "/"))
152     ret_dir = posix_prefix + path;
153   else if (utils::parse::starts_with(path, "~/"))
154     ret_dir = posix_prefix + home + path.substr(1, path.size() - 1);
155   else if (utils::parse::starts_with(path, "./"))
156     ret_dir = posix_prefix + current + path.substr(1, path.size() - 1);
157   else
158     ret_dir = posix_prefix + current + "/" + path;
159 
160   adjacent_slashes_dedup(&ret_dir);
161   purge_dots_from_path(&ret_dir);
162 
163   return ret_dir;
164 }
165 
create_dir(const std::string & path) const166 Status Posix::create_dir(const std::string& path) const {
167   // If the directory does not exist, create it
168   if (is_dir(path)) {
169     return LOG_STATUS(Status::IOError(
170         std::string("Cannot create directory '") + path +
171         "'; Directory already exists"));
172   }
173 
174   uint32_t permissions = 0;
175   RETURN_NOT_OK(get_posix_directory_permissions(&permissions));
176 
177   if (mkdir(path.c_str(), permissions) != 0) {
178     return LOG_STATUS(Status::IOError(
179         std::string("Cannot create directory '") + path + "'; " +
180         strerror(errno)));
181   }
182   return Status::Ok();
183 }
184 
touch(const std::string & filename) const185 Status Posix::touch(const std::string& filename) const {
186   uint32_t permissions = 0;
187   RETURN_NOT_OK(get_posix_file_permissions(&permissions));
188 
189   int fd = ::open(filename.c_str(), O_WRONLY | O_CREAT | O_SYNC, permissions);
190   if (fd == -1 || ::close(fd) != 0) {
191     return LOG_STATUS(Status::IOError(
192         std::string("Failed to create file '") + filename + "'; " +
193         strerror(errno)));
194   }
195 
196   return Status::Ok();
197 }
198 
current_dir()199 std::string Posix::current_dir() {
200   std::string dir;
201   char* path = getcwd(nullptr, 0);
202   if (path != nullptr) {
203     dir = path;
204     free(path);
205   }
206   return dir;
207 }
208 
209 // TODO: it maybe better to use unlinkat for deeply nested recursive directories
210 // but the path name length limit in TileDB may make this unnecessary
unlink_cb(const char * fpath,const struct stat * sb,int typeflag,struct FTW * ftwbuf)211 int Posix::unlink_cb(
212     const char* fpath,
213     const struct stat* sb,
214     int typeflag,
215     struct FTW* ftwbuf) {
216   (void)sb;
217   (void)typeflag;
218   (void)ftwbuf;
219   int rc = remove(fpath);
220   if (rc)
221     perror(fpath);
222   return rc;
223 }
224 
remove_dir(const std::string & path) const225 Status Posix::remove_dir(const std::string& path) const {
226   int rc = nftw(path.c_str(), unlink_cb, 64, FTW_DEPTH | FTW_PHYS);
227   if (rc)
228     return LOG_STATUS(Status::IOError(
229         std::string("Failed to delete path '") + path + "';  " +
230         strerror(errno)));
231   return Status::Ok();
232 }
233 
remove_file(const std::string & path) const234 Status Posix::remove_file(const std::string& path) const {
235   if (remove(path.c_str()) != 0) {
236     return LOG_STATUS(Status::IOError(
237         std::string("Cannot delete file '") + path + "'; " + strerror(errno)));
238   }
239   return Status::Ok();
240 }
241 
file_size(const std::string & path,uint64_t * size) const242 Status Posix::file_size(const std::string& path, uint64_t* size) const {
243   int fd = open(path.c_str(), O_RDONLY);
244   if (fd == -1) {
245     return LOG_STATUS(Status::IOError(
246         "Cannot get file size of '" + path + "'; " + strerror(errno)));
247   }
248 
249   struct stat st;
250   fstat(fd, &st);
251   *size = (uint64_t)st.st_size;
252 
253   close(fd);
254   return Status::Ok();
255 }
256 
filelock_lock(const std::string & filename,filelock_t * fd,bool shared) const257 Status Posix::filelock_lock(
258     const std::string& filename, filelock_t* fd, bool shared) const {
259   // Prepare the flock struct
260   struct flock fl;
261   memset(&fl, 0, sizeof(struct flock));
262   if (shared)
263     fl.l_type = F_RDLCK;
264   else
265     fl.l_type = F_WRLCK;
266   fl.l_whence = SEEK_SET;
267   fl.l_start = 0;
268   fl.l_len = 0;
269   fl.l_pid = getpid();
270 
271   // Open the file
272   *fd = ::open(filename.c_str(), shared ? O_RDONLY : O_WRONLY);
273   if (*fd == -1) {
274     return LOG_STATUS(Status::IOError(
275         "Cannot open filelock '" + filename + "'; " + strerror(errno)));
276   }
277   // Acquire the lock
278   if (fcntl(*fd, F_SETLKW, &fl) == -1) {
279     return LOG_STATUS(Status::IOError(
280         "Cannot lock filelock '" + filename + "'; " + strerror(errno)));
281   }
282   return Status::Ok();
283 }
284 
filelock_unlock(filelock_t fd) const285 Status Posix::filelock_unlock(filelock_t fd) const {
286   if (::close(fd) == -1)
287     return LOG_STATUS(Status::IOError(
288         std::string("Cannot unlock filelock: ") + strerror(errno)));
289   return Status::Ok();
290 }
291 
init(const Config & config,ThreadPool * vfs_thread_pool)292 Status Posix::init(const Config& config, ThreadPool* vfs_thread_pool) {
293   if (vfs_thread_pool == nullptr) {
294     return LOG_STATUS(
295         Status::VFSError("Cannot initialize with null thread pool"));
296   }
297 
298   config_ = config;
299   vfs_thread_pool_ = vfs_thread_pool;
300 
301   return Status::Ok();
302 }
303 
is_dir(const std::string & path) const304 bool Posix::is_dir(const std::string& path) const {
305   struct stat st;
306   memset(&st, 0, sizeof(struct stat));
307   return stat(path.c_str(), &st) == 0 && S_ISDIR(st.st_mode);
308 }
309 
is_file(const std::string & path) const310 bool Posix::is_file(const std::string& path) const {
311   struct stat st;
312   memset(&st, 0, sizeof(struct stat));
313   return (stat(path.c_str(), &st) == 0) && !S_ISDIR(st.st_mode);
314 }
315 
ls(const std::string & path,std::vector<std::string> * paths) const316 Status Posix::ls(
317     const std::string& path, std::vector<std::string>* paths) const {
318   struct dirent* next_path = nullptr;
319   DIR* dir = opendir(path.c_str());
320   if (dir == nullptr) {
321     return Status::Ok();
322   }
323   while ((next_path = readdir(dir)) != nullptr) {
324     if (!strcmp(next_path->d_name, ".") || !strcmp(next_path->d_name, ".."))
325       continue;
326     std::string abspath = path + "/" + next_path->d_name;
327     paths->emplace_back(abspath);
328   }
329   // close parent directory
330   if (closedir(dir) != 0) {
331     return LOG_STATUS(Status::IOError(
332         std::string("Cannot close parent directory; ") + strerror(errno)));
333   }
334   return Status::Ok();
335 }
336 
move_path(const std::string & old_path,const std::string & new_path)337 Status Posix::move_path(
338     const std::string& old_path, const std::string& new_path) {
339   if (rename(old_path.c_str(), new_path.c_str()) != 0) {
340     return LOG_STATUS(
341         Status::IOError(std::string("Cannot move path: ") + strerror(errno)));
342   }
343   return Status::Ok();
344 }
345 
copy_file(const std::string & old_path,const std::string & new_path)346 Status Posix::copy_file(
347     const std::string& old_path, const std::string& new_path) {
348   std::ifstream src(old_path, std::ios::binary);
349   std::ofstream dst(new_path, std::ios::binary);
350   dst << src.rdbuf();
351   return Status::Ok();
352 }
353 
copy_dir(const std::string & old_path,const std::string & new_path)354 Status Posix::copy_dir(
355     const std::string& old_path, const std::string& new_path) {
356   RETURN_NOT_OK(create_dir(new_path));
357   std::vector<std::string> paths;
358   RETURN_NOT_OK(ls(old_path, &paths));
359 
360   std::queue<std::string> path_queue;
361   for (auto& path : paths)
362     path_queue.emplace(std::move(path));
363 
364   while (!path_queue.empty()) {
365     std::string file_name_abs = path_queue.front();
366     std::string file_name = file_name_abs.substr(old_path.length() + 1);
367     path_queue.pop();
368 
369     if (is_dir(file_name_abs)) {
370       RETURN_NOT_OK(create_dir(new_path + "/" + file_name));
371       std::vector<std::string> child_paths;
372       RETURN_NOT_OK(ls(file_name_abs, &child_paths));
373       for (auto& path : child_paths)
374         path_queue.emplace(std::move(path));
375     } else {
376       assert(is_file(file_name_abs));
377       RETURN_NOT_OK(
378           copy_file(old_path + "/" + file_name, new_path + "/" + file_name));
379     }
380   }
381 
382   return Status::Ok();
383 }
384 
purge_dots_from_path(std::string * path)385 void Posix::purge_dots_from_path(std::string* path) {
386   // Trivial case
387   if (path == nullptr)
388     return;
389 
390   // Trivial case
391   uint64_t path_size = path->size();
392   if (path_size == 0 || *path == "file:///")
393     return;
394 
395   assert(utils::parse::starts_with(*path, "file:///"));
396 
397   // Tokenize
398   const char* token_c_str = path->c_str() + 8;
399   std::vector<std::string> tokens, final_tokens;
400   std::string token;
401 
402   for (uint64_t i = 8; i < path_size; ++i) {
403     if ((*path)[i] == '/') {
404       (*path)[i] = '\0';
405       token = token_c_str;
406       if (!token.empty())
407         tokens.push_back(token);
408       token_c_str = path->c_str() + i + 1;
409     }
410   }
411   token = token_c_str;
412   if (!token.empty())
413     tokens.push_back(token);
414 
415   // Purge dots
416   for (auto& t : tokens) {
417     if (t == ".")  // Skip single dots
418       continue;
419 
420     if (t == "..") {
421       if (final_tokens.empty()) {
422         // Invalid path
423         *path = "";
424         return;
425       }
426 
427       final_tokens.pop_back();
428     } else {
429       final_tokens.push_back(t);
430     }
431   }
432 
433   // Assemble final path
434   *path = "file://";
435   for (auto& t : final_tokens)
436     *path += std::string("/") + t;
437 }
438 
read(const std::string & path,uint64_t offset,void * buffer,uint64_t nbytes) const439 Status Posix::read(
440     const std::string& path,
441     uint64_t offset,
442     void* buffer,
443     uint64_t nbytes) const {
444   // Checks
445   uint64_t file_size;
446   RETURN_NOT_OK(this->file_size(path, &file_size));
447   if (offset + nbytes > file_size)
448     return LOG_STATUS(
449         Status::IOError("Cannot read from file; Read exceeds file size"));
450 
451   // Open file
452   int fd = open(path.c_str(), O_RDONLY);
453   if (fd == -1) {
454     return LOG_STATUS(Status::IOError(
455         std::string("Cannot read from file; ") + strerror(errno)));
456   }
457   if (offset > static_cast<uint64_t>(std::numeric_limits<off_t>::max())) {
458     return LOG_STATUS(Status::IOError(
459         std::string("Cannot read from file ' ") + path.c_str() +
460         "'; offset > typemax(off_t)"));
461   }
462   if (nbytes > SSIZE_MAX) {
463     return LOG_STATUS(Status::IOError(
464         std::string("Cannot read from file ' ") + path.c_str() +
465         "'; nbytes > SSIZE_MAX"));
466   }
467   uint64_t bytes_read = read_all(fd, buffer, nbytes, offset);
468   if (bytes_read != nbytes) {
469     return LOG_STATUS(Status::IOError(
470         std::string("Cannot read from file '") + path.c_str() +
471         "'; File reading error"));
472   }
473   // Close file
474   if (close(fd)) {
475     return LOG_STATUS(Status::IOError(
476         std::string("Cannot read from file; ") + strerror(errno)));
477   }
478   return Status::Ok();
479 }
480 
sync(const std::string & path)481 Status Posix::sync(const std::string& path) {
482   uint32_t permissions = 0;
483 
484   // Open file
485   int fd = -1;
486   if (is_dir(path)) {  // DIRECTORY
487     RETURN_NOT_OK(get_posix_directory_permissions(&permissions));
488     fd = open(path.c_str(), O_RDONLY, permissions);
489   } else if (is_file(path)) {  // FILE
490     RETURN_NOT_OK(get_posix_file_permissions(&permissions));
491     fd = open(path.c_str(), O_WRONLY | O_APPEND | O_CREAT, permissions);
492   } else
493     return Status::Ok();  // If file does not exist, exit
494 
495   // Handle error
496   if (fd == -1) {
497     return LOG_STATUS(Status::IOError(
498         std::string("Cannot open file '") + path + "' for syncing; " +
499         strerror(errno)));
500   }
501 
502   // Sync
503   if (fsync(fd) != 0) {
504     return LOG_STATUS(Status::IOError(
505         std::string("Cannot sync file '") + path + "'; " + strerror(errno)));
506   }
507 
508   // Close file
509   if (close(fd) != 0) {
510     return LOG_STATUS(Status::IOError(
511         std::string("Cannot close synced file '") + path + "'; " +
512         strerror(errno)));
513   }
514 
515   // Success
516   return Status::Ok();
517 }
518 
write(const std::string & path,const void * buffer,uint64_t buffer_size)519 Status Posix::write(
520     const std::string& path, const void* buffer, uint64_t buffer_size) {
521   // Get config params
522   bool found = false;
523   uint64_t min_parallel_size = 0;
524   uint64_t max_parallel_ops = 0;
525   RETURN_NOT_OK(config_.get().get<uint64_t>(
526       "vfs.min_parallel_size", &min_parallel_size, &found));
527   assert(found);
528   RETURN_NOT_OK(config_.get().get<uint64_t>(
529       "vfs.file.max_parallel_ops", &max_parallel_ops, &found));
530   assert(found);
531 
532   uint32_t permissions = 0;
533   RETURN_NOT_OK(get_posix_file_permissions(&permissions));
534 
535   // Get file offset (equal to file size)
536   Status st;
537   uint64_t file_offset = 0;
538   if (is_file(path)) {
539     st = file_size(path, &file_offset);
540     if (!st.ok()) {
541       std::stringstream errmsg;
542       errmsg << "Cannot write to file '" << path << "'; " << st.message();
543       return LOG_STATUS(Status::IOError(errmsg.str()));
544     }
545   }
546 
547   // Open or create file.
548   int fd = open(path.c_str(), O_WRONLY | O_CREAT, permissions);
549   if (fd == -1) {
550     return LOG_STATUS(Status::IOError(
551         std::string("Cannot open file '") + path + "'; " + strerror(errno)));
552   }
553 
554   // Ensure that each thread is responsible for at least min_parallel_size
555   // bytes, and cap the number of parallel operations at the thread pool size.
556   uint64_t num_ops = std::min(
557       std::max(buffer_size / min_parallel_size, uint64_t(1)), max_parallel_ops);
558   if (num_ops == 1) {
559     st = write_at(fd, file_offset, buffer, buffer_size);
560     if (!st.ok()) {
561       close(fd);
562       std::stringstream errmsg;
563       errmsg << "Cannot write to file '" << path << "'; " << st.message();
564       return LOG_STATUS(Status::IOError(errmsg.str()));
565     }
566   } else {
567     std::vector<ThreadPool::Task> results;
568     uint64_t thread_write_nbytes = utils::math::ceil(buffer_size, num_ops);
569     for (uint64_t i = 0; i < num_ops; i++) {
570       uint64_t begin = i * thread_write_nbytes,
571                end =
572                    std::min((i + 1) * thread_write_nbytes - 1, buffer_size - 1);
573       uint64_t thread_nbytes = end - begin + 1;
574       uint64_t thread_file_offset = file_offset + begin;
575       auto thread_buffer = reinterpret_cast<const char*>(buffer) + begin;
576       results.emplace_back(vfs_thread_pool_->execute(
577           [fd, thread_file_offset, thread_buffer, thread_nbytes]() {
578             return write_at(
579                 fd, thread_file_offset, thread_buffer, thread_nbytes);
580           }));
581     }
582     st = vfs_thread_pool_->wait_all(results);
583     if (!st.ok()) {
584       close(fd);
585       std::stringstream errmsg;
586       errmsg << "Cannot write to file '" << path << "'; " << st.message();
587       return LOG_STATUS(Status::IOError(errmsg.str()));
588     }
589   }
590   if (close(fd) != 0) {
591     return LOG_STATUS(Status::IOError(
592         std::string("Cannot close file '") + path + "'; " + strerror(errno)));
593   }
594   return st;
595 }
596 
write_at(int fd,uint64_t file_offset,const void * buffer,uint64_t buffer_size)597 Status Posix::write_at(
598     int fd, uint64_t file_offset, const void* buffer, uint64_t buffer_size) {
599   // Append data to the file in batches of constants::max_write_bytes
600   // bytes at a time
601   uint64_t buffer_bytes_written = 0;
602   const char* buffer_bytes_ptr = static_cast<const char*>(buffer);
603   while (buffer_size > constants::max_write_bytes) {
604     uint64_t bytes_written = pwrite_all(
605         fd,
606         file_offset + buffer_bytes_written,
607         buffer_bytes_ptr + buffer_bytes_written,
608         constants::max_write_bytes);
609     if (bytes_written != constants::max_write_bytes) {
610       return LOG_STATUS(Status::IOError(
611           std::string("Cannot write to file; File writing error")));
612     }
613     buffer_bytes_written += bytes_written;
614     buffer_size -= bytes_written;
615   }
616   uint64_t bytes_written = pwrite_all(
617       fd,
618       file_offset + buffer_bytes_written,
619       buffer_bytes_ptr + buffer_bytes_written,
620       buffer_size);
621   if (bytes_written != buffer_size) {
622     return LOG_STATUS(Status::IOError(
623         std::string("Cannot write to file; File writing error")));
624   }
625   return Status::Ok();
626 }
627 
get_posix_file_permissions(uint32_t * permissions) const628 Status Posix::get_posix_file_permissions(uint32_t* permissions) const {
629   // Get config params
630   bool found = false;
631   std::string posix_permissions =
632       config_.get().get("vfs.file.posix_file_permissions", &found);
633   assert(found);
634 
635   // Permissions are passed in octal notation by the user
636   *permissions = std::strtol(posix_permissions.c_str(), NULL, 8);
637 
638   return Status::Ok();
639 }
640 
get_posix_directory_permissions(uint32_t * permissions) const641 Status Posix::get_posix_directory_permissions(uint32_t* permissions) const {
642   // Get config params
643   bool found = false;
644   std::string posix_permissions =
645       config_.get().get("vfs.file.posix_directory_permissions", &found);
646   assert(found);
647 
648   // Permissions are passed in octal notation by the user
649   *permissions = std::strtol(posix_permissions.c_str(), NULL, 8);
650 
651   return Status::Ok();
652 }
653 
654 }  // namespace sm
655 }  // namespace tiledb
656 
657 #endif  // !_WIN32
658