1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/experimental/io/SimpleAsyncIO.h>
18
19 #include <bitset>
20
21 #include <folly/File.h>
22 #include <folly/Random.h>
23 #include <folly/experimental/coro/BlockingWait.h>
24 #include <folly/experimental/coro/Collect.h>
25 #include <folly/io/IOBuf.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/synchronization/Baton.h>
28
29 #include <glog/logging.h>
30
31 using namespace folly;
32
33 class SimpleAsyncIOTest : public ::testing::TestWithParam<SimpleAsyncIO::Mode> {
34 public:
SetUp()35 void SetUp() override { config_.setMode(GetParam()); }
36
testTypeToString(testing::TestParamInfo<SimpleAsyncIO::Mode> const & setting)37 static std::string testTypeToString(
38 testing::TestParamInfo<SimpleAsyncIO::Mode> const& setting) {
39 switch (setting.param) {
40 case SimpleAsyncIO::Mode::AIO:
41 return "aio";
42 case SimpleAsyncIO::Mode::IOURING:
43 return "iouring";
44 }
45 }
46
47 protected:
48 SimpleAsyncIO::Config config_;
49 };
50
TEST_P(SimpleAsyncIOTest,WriteAndReadBack)51 TEST_P(SimpleAsyncIOTest, WriteAndReadBack) {
52 auto tmpfile = File::temporary();
53 SimpleAsyncIO aio(config_);
54
55 Baton done;
56 int result;
57 const std::string data("Green Room Rockers");
58
59 aio.pwrite(
60 tmpfile.fd(), data.data(), data.size(), 0, [&done, &result](int rc) {
61 result = rc;
62 done.post();
63 });
64 ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
65 EXPECT_EQ(result, data.size());
66
67 std::array<uint8_t, 128> buffer;
68 done.reset();
69 aio.pread(
70 tmpfile.fd(), buffer.data(), buffer.size(), 0, [&done, &result](int rc) {
71 result = rc;
72 done.post();
73 });
74 ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(10)));
75 EXPECT_EQ(result, data.size());
76 EXPECT_EQ(memcmp(buffer.data(), data.data(), data.size()), 0);
77 }
78
makeRandomBinaryString(size_t size)79 std::string makeRandomBinaryString(size_t size) {
80 std::string content;
81 content.clear();
82 while (content.size() < size) {
83 content.append(std::bitset<8>(folly::Random::rand32()).to_string());
84 }
85 content.resize(size);
86 return content;
87 }
88
TEST_P(SimpleAsyncIOTest,ChainedReads)89 TEST_P(SimpleAsyncIOTest, ChainedReads) {
90 auto tmpfile = File::temporary();
91 int fd = tmpfile.fd();
92 Baton done;
93
94 static const size_t chunkSize = 128;
95 static const size_t numChunks = 1000;
96 std::vector<std::unique_ptr<IOBuf>> writeChunks;
97 std::vector<std::unique_ptr<IOBuf>> readChunks;
98 std::atomic<uint32_t> completed = 0;
99
100 for (size_t i = 0; i < numChunks; ++i) {
101 writeChunks.push_back(IOBuf::copyBuffer(makeRandomBinaryString(chunkSize)));
102 readChunks.push_back(IOBuf::create(chunkSize));
103 }
104
105 // allow for one read and one write for each chunk to be outstanding.
106 SimpleAsyncIO aio(config_.setMaxRequests(numChunks * 2));
107 for (size_t i = 0; i < numChunks; ++i) {
108 aio.pwrite(
109 fd,
110 writeChunks[i]->data(),
111 chunkSize,
112 i * chunkSize,
113 [fd, i, &readChunks, &aio, &done, &completed](int rc) {
114 ASSERT_EQ(rc, chunkSize);
115 aio.pread(
116 fd,
117 readChunks[i]->writableData(),
118 chunkSize,
119 i * chunkSize,
120 [=, &done, &completed](int rc) {
121 ASSERT_EQ(rc, chunkSize);
122 if (++completed == numChunks) {
123 done.post();
124 }
125 });
126 });
127 }
128
129 ASSERT_TRUE(done.try_wait_for(std::chrono::seconds(60)));
130
131 for (size_t i = 0; i < numChunks; ++i) {
132 CHECK_EQ(
133 memcmp(writeChunks[i]->data(), readChunks[i]->data(), chunkSize), 0);
134 }
135 }
136
TEST_P(SimpleAsyncIOTest,DestroyWithPendingIO)137 TEST_P(SimpleAsyncIOTest, DestroyWithPendingIO) {
138 auto tmpfile = File::temporary();
139 int fd = tmpfile.fd();
140 std::atomic<uint32_t> completed = 0;
141 static const size_t bufferSize = 128;
142 static const size_t numWrites = 100;
143 std::array<uint8_t, bufferSize> buffer;
144 memset(buffer.data(), 0, buffer.size());
145
146 // Slam out 100 writes and then destroy the SimpleAsyncIO instance
147 // without waiting for them to complete.
148 {
149 SimpleAsyncIO aio(config_);
150 for (size_t i = 0; i < numWrites; ++i) {
151 aio.pwrite(
152 fd, buffer.data(), bufferSize, i * bufferSize, [&completed](int rc) {
153 ASSERT_EQ(rc, bufferSize);
154 ++completed;
155 });
156 }
157 }
158
159 // Destructor should have blocked until all IO was done.
160 ASSERT_EQ(completed, numWrites);
161 }
162
163 #if FOLLY_HAS_COROUTINES
doCoAsyncWrites(SimpleAsyncIO & aio,int fd,std::string const & data,int copies)164 static folly::coro::Task<folly::Unit> doCoAsyncWrites(
165 SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
166 std::vector<folly::coro::Task<int>> writes;
167
168 for (int i = 0; i < copies; ++i) {
169 writes.emplace_back(
170 aio.co_pwrite(fd, data.data(), data.length(), data.length() * i));
171 }
172
173 auto results = co_await folly::coro::collectAllRange(std::move(writes));
174
175 for (int result : results) {
176 EXPECT_EQ(result, data.length());
177 }
178 co_return Unit{};
179 }
180
doCoAsyncReads(SimpleAsyncIO & aio,int fd,std::string const & data,int copies)181 static folly::coro::Task<folly::Unit> doCoAsyncReads(
182 SimpleAsyncIO& aio, int fd, std::string const& data, int copies) {
183 std::vector<std::unique_ptr<char[]>> buffers;
184 std::vector<folly::coro::Task<int>> reads;
185
186 for (int i = 0; i < copies; ++i) {
187 buffers.emplace_back(std::make_unique<char[]>(data.length()));
188
189 reads.emplace_back(
190 aio.co_pread(fd, buffers[i].get(), data.length(), data.length() * i));
191 }
192
193 auto results = co_await folly::coro::collectAllRange(std::move(reads));
194
195 for (int i = 0; i < copies; ++i) {
196 EXPECT_EQ(results[i], data.length());
197 EXPECT_EQ(::memcmp(data.data(), buffers[i].get(), data.length()), 0);
198 }
199 co_return Unit{};
200 }
201
TEST_P(SimpleAsyncIOTest,CoroutineReadWrite)202 TEST_P(SimpleAsyncIOTest, CoroutineReadWrite) {
203 auto tmpfile = File::temporary();
204 int fd = tmpfile.fd();
205 SimpleAsyncIO aio(config_);
206 std::string testStr = "Uncle Touchy goes to college";
207 folly::coro::blockingWait(doCoAsyncWrites(aio, fd, testStr, 10));
208 folly::coro::blockingWait(doCoAsyncReads(aio, fd, testStr, 10));
209 }
210 #endif // FOLLY_HAS_COROUTINES
211
212 INSTANTIATE_TEST_SUITE_P(
213 SimpleAsyncIOTests,
214 SimpleAsyncIOTest,
215 ::testing::Values(
216 SimpleAsyncIO::Mode::AIO /*, SimpleAsyncIO::Mode::IOURING */),
217 SimpleAsyncIOTest::testTypeToString);
218