1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/experimental/io/SimpleAsyncIO.h>
18 
19 #include <bitset>
20 
21 #include <folly/File.h>
22 #include <folly/Random.h>
23 #include <folly/experimental/coro/BlockingWait.h>
24 #include <folly/experimental/coro/Collect.h>
25 #include <folly/io/IOBuf.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/synchronization/Baton.h>
28 
29 #include <glog/logging.h>
30 
31 using namespace folly;
32 
33 class SimpleAsyncIOTest : public ::testing::TestWithParam<SimpleAsyncIO::Mode> {
34  public:
SetUp()35   void SetUp() override { config_.setMode(GetParam()); }
36 
testTypeToString(testing::TestParamInfo<SimpleAsyncIO::Mode> const & setting)37   static std::string testTypeToString(
38       testing::TestParamInfo<SimpleAsyncIO::Mode> const& setting) {
39     switch (setting.param) {
40       case SimpleAsyncIO::Mode::AIO:
41         return "aio";
42       case SimpleAsyncIO::Mode::IOURING:
43         return "iouring";
44     }
45   }
46 
47  protected:
48   SimpleAsyncIO::Config config_;
49 };
50 
TEST_P(SimpleAsyncIOTest,WriteAndReadBack)51 TEST_P(SimpleAsyncIOTest, WriteAndReadBack) {
52   auto tmpfile = File::temporary();
53   SimpleAsyncIO aio(config_);
54 
55   Baton done;
56   int result;
57   const std::string data("Green Room Rockers");
58 
59   aio.pwrite(
60       tmpfile.fd(), data.data(), data.size(), 0, [&done, &result](int rc) {
61         result = rc;
62         done.post();
63       });
64   ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
65   EXPECT_EQ(result, data.size());
66 
67   std::array<uint8_t, 128> buffer;
68   done.reset();
69   aio.pread(
70       tmpfile.fd(), buffer.data(), buffer.size(), 0, [&done, &result](int rc) {
71         result = rc;
72         done.post();
73       });
74   ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
75   EXPECT_EQ(result, data.size());
76   EXPECT_EQ(memcmp(buffer.data(), data.data(), data.size()), 0);
77 }
78 
makeRandomBinaryString(size_t size)79 std::string makeRandomBinaryString(size_t size) {
80   std::string content;
81   content.clear();
82   while (content.size() < size) {
83     content.append(std::bitset<8>(folly::Random::rand32()).to_string());
84   }
85   content.resize(size);
86   return content;
87 }
88 
TEST_P(SimpleAsyncIOTest,ChainedReads)89 TEST_P(SimpleAsyncIOTest, ChainedReads) {
90   auto tmpfile = File::temporary();
91   int fd = tmpfile.fd();
92   Baton done;
93 
94   static const size_t chunkSize = 128;
95   static const size_t numChunks = 1000;
96   std::vector<std::unique_ptr<IOBuf>> writeChunks;
97   std::vector<std::unique_ptr<IOBuf>> readChunks;
98   std::atomic<uint32_t> completed = 0;
99 
100   for (size_t i = 0; i < numChunks; ++i) {
101     writeChunks.push_back(IOBuf::copyBuffer(makeRandomBinaryString(chunkSize)));
102     readChunks.push_back(IOBuf::create(chunkSize));
103   }
104 
105   // allow for one read and one write for each chunk to be outstanding.
106   SimpleAsyncIO aio(config_.setMaxRequests(numChunks * 2));
107   for (size_t i = 0; i < numChunks; ++i) {
108     aio.pwrite(
109         fd,
110         writeChunks[i]->data(),
111         chunkSize,
112         i * chunkSize,
113         [fd, i, &readChunks, &aio, &done, &completed](int rc) {
114           ASSERT_EQ(rc, chunkSize);
115           aio.pread(
116               fd,
117               readChunks[i]->writableData(),
118               chunkSize,
119               i * chunkSize,
120               [=, &done, &completed](int rc) {
121                 ASSERT_EQ(rc, chunkSize);
122                 if (++completed == numChunks) {
123                   done.post();
124                 }
125               });
126         });
127   }
128 
129   ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(60)));
130 
131   for (size_t i = 0; i < numChunks; ++i) {
132     CHECK_EQ(
133         memcmp(writeChunks[i]->data(), readChunks[i]->data(), chunkSize), 0);
134   }
135 }
136 
TEST_P(SimpleAsyncIOTest,DestroyWithPendingIO)137 TEST_P(SimpleAsyncIOTest, DestroyWithPendingIO) {
138   auto tmpfile = File::temporary();
139   int fd = tmpfile.fd();
140   std::atomic<uint32_t> completed = 0;
141   static const size_t bufferSize = 128;
142   static const size_t numWrites = 100;
143   std::array<uint8_t, bufferSize> buffer;
144   memset(buffer.data(), 0, buffer.size());
145 
146   // Slam out 100 writes and then destroy the SimpleAsyncIO instance
147   // without waiting for them to complete.
148   {
149     SimpleAsyncIO aio(config_);
150     for (size_t i = 0; i < numWrites; ++i) {
151       aio.pwrite(
152           fd, buffer.data(), bufferSize, i * bufferSize, [&completed](int rc) {
153             ASSERT_EQ(rc, bufferSize);
154             ++completed;
155           });
156     }
157   }
158 
159   // Destructor should have blocked until all IO was done.
160   ASSERT_EQ(completed, numWrites);
161 }
162 
163 #if FOLLY_HAS_COROUTINES
doCoAsyncWrites(SimpleAsyncIO & aio,int fd,std::string const & data,int copies)164 static folly::coro::Task<folly::Unit> doCoAsyncWrites(
165     SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
166   std::vector<folly::coro::Task<int>> writes;
167 
168   for (int i = 0; i < copies; ++i) {
169     writes.emplace_back(
170         aio.co_pwrite(fd, data.data(), data.length(), data.length() * i));
171   }
172 
173   auto results = co_await folly::coro::collectAllRange(std::move(writes));
174 
175   for (int result : results) {
176     EXPECT_EQ(result, data.length());
177   }
178   co_return Unit{};
179 }
180 
doCoAsyncReads(SimpleAsyncIO & aio,int fd,std::string const & data,int copies)181 static folly::coro::Task<folly::Unit> doCoAsyncReads(
182     SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
183   std::vector<std::unique_ptr<char[]>> buffers;
184   std::vector<folly::coro::Task<int>> reads;
185 
186   for (int i = 0; i < copies; ++i) {
187     buffers.emplace_back(std::make_unique<char[]>(data.length()));
188 
189     reads.emplace_back(
190         aio.co_pread(fd, buffers[i].get(), data.length(), data.length() * i));
191   }
192 
193   auto results = co_await folly::coro::collectAllRange(std::move(reads));
194 
195   for (int i = 0; i < copies; ++i) {
196     EXPECT_EQ(results[i], data.length());
197     EXPECT_EQ(::memcmp(data.data(), buffers[i].get(), data.length()), 0);
198   }
199   co_return Unit{};
200 }
201 
TEST_P(SimpleAsyncIOTest,CoroutineReadWrite)202 TEST_P(SimpleAsyncIOTest, CoroutineReadWrite) {
203   auto tmpfile = File::temporary();
204   int fd = tmpfile.fd();
205   SimpleAsyncIO aio(config_);
206   std::string testStr = "Uncle Touchy goes to college";
207   folly::coro::blockingWait(doCoAsyncWrites(aio, fd, testStr, 10));
208   folly::coro::blockingWait(doCoAsyncReads(aio, fd, testStr, 10));
209 }
210 #endif // FOLLY_HAS_COROUTINES
211 
212 INSTANTIATE_TEST_SUITE_P(
213     SimpleAsyncIOTests,
214     SimpleAsyncIOTest,
215     ::testing::Values(
216         SimpleAsyncIO::Mode::AIO /*, SimpleAsyncIO::Mode::IOURING */),
217     SimpleAsyncIOTest::testTypeToString);
218