1 //
2 // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 #include "td/utils/algorithm.h"
8 #include "td/utils/common.h"
9 #include "td/utils/logging.h"
10 #include "td/utils/misc.h"
11 #include "td/utils/port/EventFd.h"
12 #include "td/utils/port/FileFd.h"
13 #include "td/utils/port/IoSlice.h"
14 #include "td/utils/port/path.h"
15 #include "td/utils/port/signals.h"
16 #include "td/utils/port/sleep.h"
17 #include "td/utils/port/thread.h"
18 #include "td/utils/port/thread_local.h"
19 #include "td/utils/Random.h"
20 #include "td/utils/ScopeGuard.h"
21 #include "td/utils/Slice.h"
22 #include "td/utils/SliceBuilder.h"
23 #include "td/utils/tests.h"
24 #include "td/utils/Time.h"
25 
26 #if TD_PORT_POSIX && !TD_THREAD_UNSUPPORTED
27 #include <algorithm>
28 #include <atomic>
29 #include <mutex>
30 
31 #include <pthread.h>
32 #include <signal.h>
33 #endif
34 
TEST(Port,files)35 TEST(Port, files) {
36   td::CSlice main_dir = "test_dir";
37   td::rmrf(main_dir).ignore();
38   ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Write).is_error());
39   ASSERT_TRUE(td::walk_path(main_dir, [](td::CSlice name, td::WalkPath::Type type) { UNREACHABLE(); }).is_error());
40   td::mkdir(main_dir).ensure();
41   td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "A").ensure();
42   td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "B").ensure();
43   td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "B" << TD_DIR_SLASH << "D").ensure();
44   td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "C").ensure();
45   ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Write).is_error());
46   td::string fd_path = PSTRING() << main_dir << TD_DIR_SLASH << "t.txt";
47   td::string fd2_path = PSTRING() << main_dir << TD_DIR_SLASH << "C" << TD_DIR_SLASH << "t2.txt";
48 
49   auto fd = td::FileFd::open(fd_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok();
50   auto fd2 = td::FileFd::open(fd2_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok();
51   fd2.close();
52 
53   int cnt = 0;
54   const int ITER_COUNT = 1000;
55   for (int i = 0; i < ITER_COUNT; i++) {
56     td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) {
57       if (type == td::WalkPath::Type::NotDir) {
58         ASSERT_TRUE(name == fd_path || name == fd2_path);
59       }
60       cnt++;
61     }).ensure();
62   }
63   ASSERT_EQ((5 * 2 + 2) * ITER_COUNT, cnt);
64   bool was_abort = false;
65   td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) {
66     CHECK(!was_abort);
67     if (type == td::WalkPath::Type::EnterDir && ends_with(name, PSLICE() << TD_DIR_SLASH << "B")) {
68       was_abort = true;
69       return td::WalkPath::Action::Abort;
70     }
71     return td::WalkPath::Action::Continue;
72   }).ensure();
73   CHECK(was_abort);
74 
75   cnt = 0;
76   bool is_first_dir = true;
77   td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) {
78     cnt++;
79     if (type == td::WalkPath::Type::EnterDir) {
80       if (is_first_dir) {
81         is_first_dir = false;
82       } else {
83         return td::WalkPath::Action::SkipDir;
84       }
85     }
86     return td::WalkPath::Action::Continue;
87   }).ensure();
88   ASSERT_EQ(6, cnt);
89 
90   ASSERT_EQ(0u, fd.get_size().move_as_ok());
91   ASSERT_EQ(12u, fd.write("Hello world!").move_as_ok());
92   ASSERT_EQ(4u, fd.pwrite("abcd", 1).move_as_ok());
93   char buf[100];
94   td::MutableSlice buf_slice(buf, sizeof(buf));
95   ASSERT_TRUE(fd.pread(buf_slice.substr(0, 4), 2).is_error());
96   fd.seek(11).ensure();
97   ASSERT_EQ(2u, fd.write("?!").move_as_ok());
98 
99   ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Read | td::FileFd::CreateNew).is_error());
100   fd = td::FileFd::open(fd_path, td::FileFd::Read | td::FileFd::Create).move_as_ok();
101   ASSERT_EQ(13u, fd.get_size().move_as_ok());
102   ASSERT_EQ(4u, fd.pread(buf_slice.substr(0, 4), 1).move_as_ok());
103   ASSERT_STREQ("abcd", buf_slice.substr(0, 4));
104 
105   fd.seek(0).ensure();
106   ASSERT_EQ(13u, fd.read(buf_slice.substr(0, 13)).move_as_ok());
107   ASSERT_STREQ("Habcd world?!", buf_slice.substr(0, 13));
108 }
109 
TEST(Port,SparseFiles)110 TEST(Port, SparseFiles) {
111   td::CSlice path = "sparse.txt";
112   td::unlink(path).ignore();
113   auto fd = td::FileFd::open(path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok();
114   ASSERT_EQ(0, fd.get_size().move_as_ok());
115   td::int64 offset = 100000000;
116   fd.pwrite("a", offset).ensure();
117   ASSERT_EQ(offset + 1, fd.get_size().move_as_ok());
118   auto real_size = fd.get_real_size().move_as_ok();
119   if (real_size >= offset + 1) {
120     LOG(ERROR) << "File system doesn't support sparse files, rewind during streaming can be slow";
121   }
122   td::unlink(path).ensure();
123 }
124 
TEST(Port,Writev)125 TEST(Port, Writev) {
126   td::vector<td::IoSlice> vec;
127   td::CSlice test_file_path = "test.txt";
128   td::unlink(test_file_path).ignore();
129   auto fd = td::FileFd::open(test_file_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok();
130   vec.push_back(td::as_io_slice("a"));
131   vec.push_back(td::as_io_slice("b"));
132   vec.push_back(td::as_io_slice("cd"));
133   ASSERT_EQ(4u, fd.writev(vec).move_as_ok());
134   vec.clear();
135   vec.push_back(td::as_io_slice("efg"));
136   vec.push_back(td::as_io_slice(""));
137   vec.push_back(td::as_io_slice("hi"));
138   ASSERT_EQ(5u, fd.writev(vec).move_as_ok());
139   fd.close();
140   fd = td::FileFd::open(test_file_path, td::FileFd::Read).move_as_ok();
141   td::Slice expected_content = "abcdefghi";
142   ASSERT_EQ(static_cast<td::int64>(expected_content.size()), fd.get_size().ok());
143   td::string content(expected_content.size(), '\0');
144   ASSERT_EQ(content.size(), fd.read(content).move_as_ok());
145   ASSERT_EQ(expected_content, content);
146 }
147 
148 #if TD_PORT_POSIX && !TD_THREAD_UNSUPPORTED
149 
150 static std::mutex m;
151 static td::vector<td::string> ptrs;
152 static td::vector<int *> addrs;
153 static TD_THREAD_LOCAL int thread_id;
154 
on_user_signal(int sig)155 static void on_user_signal(int sig) {
156   int addr;
157   addrs[thread_id] = &addr;
158   std::unique_lock<std::mutex> guard(m);
159   ptrs.push_back(td::to_string(thread_id));
160 }
161 
TEST(Port,SignalsAndThread)162 TEST(Port, SignalsAndThread) {
163   td::setup_signals_alt_stack().ensure();
164   td::set_signal_handler(td::SignalType::User, on_user_signal).ensure();
165   SCOPE_EXIT {
166     td::set_signal_handler(td::SignalType::User, nullptr).ensure();
167   };
168   td::vector<td::string> ans = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
169   {
170     td::vector<td::thread> threads;
171     int thread_n = 10;
172     td::vector<td::Stage> stages(thread_n);
173     ptrs.clear();
174     addrs.resize(thread_n);
175     for (int i = 0; i < 10; i++) {
176       threads.emplace_back([&, i] {
177         td::setup_signals_alt_stack().ensure();
178         if (i != 0) {
179           stages[i].wait(2);
180         }
181         thread_id = i;
182         pthread_kill(pthread_self(), SIGUSR1);
183         if (i + 1 < thread_n) {
184           stages[i + 1].wait(2);
185         }
186       });
187     }
188     for (auto &t : threads) {
189       t.join();
190     }
191     CHECK(ptrs == ans);
192 
193     //LOG(ERROR) << ptrs;
194     //LOG(ERROR) << addrs;
195   }
196 
197   {
198     td::Stage stage;
199     td::vector<td::thread> threads;
200     int thread_n = 10;
201     ptrs.clear();
202     addrs.resize(thread_n);
203     for (int i = 0; i < 10; i++) {
204       threads.emplace_back([&, i] {
205         stage.wait(thread_n);
206         thread_id = i;
207         pthread_kill(pthread_self(), SIGUSR1);
208       });
209     }
210     for (auto &t : threads) {
211       t.join();
212     }
213     std::sort(ptrs.begin(), ptrs.end());
214     CHECK(ptrs == ans);
215     auto addrs_size = addrs.size();
216     td::unique(addrs);
217     ASSERT_EQ(addrs_size, addrs.size());
218     //LOG(ERROR) << addrs;
219   }
220 }
221 
222 #if !TD_EVENTFD_UNSUPPORTED
TEST(Port,EventFdAndSignals)223 TEST(Port, EventFdAndSignals) {
224   td::set_signal_handler(td::SignalType::User, [](int signal) {}).ensure();
225   SCOPE_EXIT {
226     td::set_signal_handler(td::SignalType::User, nullptr).ensure();
227   };
228 
229   std::atomic_flag flag;
230   flag.test_and_set();
231   auto main_thread = pthread_self();
232   td::thread interrupt_thread{[&flag, &main_thread] {
233     td::setup_signals_alt_stack().ensure();
234     while (flag.test_and_set()) {
235       pthread_kill(main_thread, SIGUSR1);
236       td::usleep_for(1000 * td::Random::fast(1, 10));  // 0.001s - 0.01s
237     }
238   }};
239 
240   for (int timeout_ms : {0, 1, 2, 10, 100, 500}) {
241     double min_diff = 10000000;
242     double max_diff = 0;
243     for (int t = 0; t < td::max(5, 1000 / td::max(timeout_ms, 1)); t++) {
244       td::EventFd event_fd;
245       event_fd.init();
246       auto start = td::Timestamp::now();
247       event_fd.wait(timeout_ms);
248       auto end = td::Timestamp::now();
249       auto passed = end.at() - start.at();
250       auto diff = passed * 1000 - timeout_ms;
251       min_diff = td::min(min_diff, diff);
252       max_diff = td::max(max_diff, diff);
253     }
254 
255     LOG_CHECK(min_diff >= 0) << min_diff;
256     // LOG_CHECK(max_diff < 10) << max_diff;
257     LOG(INFO) << min_diff << " " << max_diff;
258   }
259   flag.clear();
260 }
261 #endif
262 #endif
263