1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <folly/io/async/AsyncPipe.h>
18
19 #include <fcntl.h>
20
21 #include <folly/Memory.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/portability/GTest.h>
24
25 using namespace testing;
26
27 namespace {
28
29 class TestReadCallback : public folly::AsyncReader::ReadCallback {
30 public:
isBufferMovable()31 bool isBufferMovable() noexcept override { return movable_; }
setMovable(bool movable)32 void setMovable(bool movable) { movable_ = movable; }
33
readBufferAvailable(std::unique_ptr<folly::IOBuf> readBuf)34 void readBufferAvailable(
35 std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
36 readBuffer_.append(std::move(readBuf));
37 }
38
readDataAvailable(size_t len)39 void readDataAvailable(size_t len) noexcept override {
40 readBuffer_.postallocate(len);
41 }
42
getReadBuffer(void ** bufReturn,size_t * lenReturn)43 void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
44 auto res = readBuffer_.preallocate(4000, 65000);
45 *bufReturn = res.first;
46 *lenReturn = res.second;
47 }
48
readEOF()49 void readEOF() noexcept override {}
50
readErr(const folly::AsyncSocketException &)51 void readErr(const folly::AsyncSocketException&) noexcept override {
52 error_ = true;
53 }
54
getData()55 std::string getData() {
56 auto buf = readBuffer_.move();
57 buf->coalesce();
58 return std::string((char*)buf->data(), buf->length());
59 }
60
reset()61 void reset() {
62 movable_ = false;
63 error_ = false;
64 readBuffer_.reset();
65 }
66
67 folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
68 bool error_{false};
69 bool movable_{false};
70 };
71
72 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
73 public:
writeSuccess()74 void writeSuccess() noexcept override { writes_++; }
75
writeErr(size_t,const folly::AsyncSocketException &)76 void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
77 error_ = true;
78 }
79
reset()80 void reset() {
81 writes_ = 0;
82 error_ = false;
83 }
84
85 uint32_t writes_{0};
86 bool error_{false};
87 };
88
89 class AsyncPipeTest : public Test {
90 public:
reset(bool movable)91 void reset(bool movable) {
92 reader_.reset();
93 readCallback_.reset();
94 writer_.reset();
95 writeCallback_.reset();
96
97 int rc = pipe(pipeFds_);
98 EXPECT_EQ(rc, 0);
99
100 EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
101 EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
102 reader_ = folly::AsyncPipeReader::newReader(
103 &eventBase_, folly::NetworkSocket::fromFd(pipeFds_[0]));
104 writer_ = folly::AsyncPipeWriter::newWriter(
105 &eventBase_, folly::NetworkSocket::fromFd(pipeFds_[1]));
106
107 readCallback_.setMovable(movable);
108 }
109
110 protected:
111 folly::EventBase eventBase_;
112 int pipeFds_[2];
113 folly::AsyncPipeReader::UniquePtr reader_;
114 folly::AsyncPipeWriter::UniquePtr writer_;
115 TestReadCallback readCallback_;
116 TestWriteCallback writeCallback_;
117 };
118
getBuf(const std::string & data)119 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
120 auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
121 return buf;
122 }
123
124 } // namespace
125
TEST_F(AsyncPipeTest,simple)126 TEST_F(AsyncPipeTest, simple) {
127 for (int pass = 0; pass < 2; ++pass) {
128 reset(pass % 2 != 0);
129 reader_->setReadCB(&readCallback_);
130 writer_->write(getBuf("hello"), &writeCallback_);
131 writer_->closeOnEmpty();
132 eventBase_.loop();
133 EXPECT_EQ(readCallback_.getData(), "hello");
134 EXPECT_FALSE(readCallback_.error_);
135 EXPECT_EQ(writeCallback_.writes_, 1);
136 EXPECT_FALSE(writeCallback_.error_);
137 }
138 }
139
TEST_F(AsyncPipeTest,blocked_writes)140 TEST_F(AsyncPipeTest, blocked_writes) {
141 for (int pass = 0; pass < 2; ++pass) {
142 reset(pass % 2 != 0);
143 uint32_t writeAttempts = 0;
144 do {
145 ++writeAttempts;
146 writer_->write(getBuf("hello"), &writeCallback_);
147 } while (writeCallback_.writes_ == writeAttempts);
148 // there is one blocked write
149 writer_->closeOnEmpty();
150
151 reader_->setReadCB(&readCallback_);
152
153 eventBase_.loop();
154 std::string expected;
155 for (uint32_t i = 0; i < writeAttempts; i++) {
156 expected += "hello";
157 }
158 EXPECT_EQ(readCallback_.getData(), expected);
159 EXPECT_FALSE(readCallback_.error_);
160 EXPECT_EQ(writeCallback_.writes_, writeAttempts);
161 EXPECT_FALSE(writeCallback_.error_);
162 }
163 }
164
TEST_F(AsyncPipeTest,writeOnClose)165 TEST_F(AsyncPipeTest, writeOnClose) {
166 for (int pass = 0; pass < 2; ++pass) {
167 reset(pass % 2 != 0);
168 reader_->setReadCB(&readCallback_);
169 writer_->write(getBuf("hello"), &writeCallback_);
170 writer_->closeOnEmpty();
171 writer_->write(getBuf("hello"), &writeCallback_);
172 eventBase_.loop();
173 EXPECT_EQ(readCallback_.getData(), "hello");
174 EXPECT_FALSE(readCallback_.error_);
175 EXPECT_EQ(writeCallback_.writes_, 1);
176 EXPECT_TRUE(writeCallback_.error_);
177 }
178 }
179