1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <folly/io/async/AsyncPipe.h>
18 
19 #include <fcntl.h>
20 
21 #include <folly/Memory.h>
22 #include <folly/io/async/EventBase.h>
23 #include <folly/portability/GTest.h>
24 
25 using namespace testing;
26 
27 namespace {
28 
29 class TestReadCallback : public folly::AsyncReader::ReadCallback {
30  public:
isBufferMovable()31   bool isBufferMovable() noexcept override { return movable_; }
setMovable(bool movable)32   void setMovable(bool movable) { movable_ = movable; }
33 
readBufferAvailable(std::unique_ptr<folly::IOBuf> readBuf)34   void readBufferAvailable(
35       std::unique_ptr<folly::IOBuf> readBuf) noexcept override {
36     readBuffer_.append(std::move(readBuf));
37   }
38 
readDataAvailable(size_t len)39   void readDataAvailable(size_t len) noexcept override {
40     readBuffer_.postallocate(len);
41   }
42 
getReadBuffer(void ** bufReturn,size_t * lenReturn)43   void getReadBuffer(void** bufReturn, size_t* lenReturn) noexcept override {
44     auto res = readBuffer_.preallocate(4000, 65000);
45     *bufReturn = res.first;
46     *lenReturn = res.second;
47   }
48 
readEOF()49   void readEOF() noexcept override {}
50 
readErr(const folly::AsyncSocketException &)51   void readErr(const folly::AsyncSocketException&) noexcept override {
52     error_ = true;
53   }
54 
getData()55   std::string getData() {
56     auto buf = readBuffer_.move();
57     buf->coalesce();
58     return std::string((char*)buf->data(), buf->length());
59   }
60 
reset()61   void reset() {
62     movable_ = false;
63     error_ = false;
64     readBuffer_.reset();
65   }
66 
67   folly::IOBufQueue readBuffer_{folly::IOBufQueue::cacheChainLength()};
68   bool error_{false};
69   bool movable_{false};
70 };
71 
72 class TestWriteCallback : public folly::AsyncWriter::WriteCallback {
73  public:
writeSuccess()74   void writeSuccess() noexcept override { writes_++; }
75 
writeErr(size_t,const folly::AsyncSocketException &)76   void writeErr(size_t, const folly::AsyncSocketException&) noexcept override {
77     error_ = true;
78   }
79 
reset()80   void reset() {
81     writes_ = 0;
82     error_ = false;
83   }
84 
85   uint32_t writes_{0};
86   bool error_{false};
87 };
88 
89 class AsyncPipeTest : public Test {
90  public:
reset(bool movable)91   void reset(bool movable) {
92     reader_.reset();
93     readCallback_.reset();
94     writer_.reset();
95     writeCallback_.reset();
96 
97     int rc = pipe(pipeFds_);
98     EXPECT_EQ(rc, 0);
99 
100     EXPECT_EQ(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK), 0);
101     EXPECT_EQ(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK), 0);
102     reader_ = folly::AsyncPipeReader::newReader(
103         &eventBase_, folly::NetworkSocket::fromFd(pipeFds_[0]));
104     writer_ = folly::AsyncPipeWriter::newWriter(
105         &eventBase_, folly::NetworkSocket::fromFd(pipeFds_[1]));
106 
107     readCallback_.setMovable(movable);
108   }
109 
110  protected:
111   folly::EventBase eventBase_;
112   int pipeFds_[2];
113   folly::AsyncPipeReader::UniquePtr reader_;
114   folly::AsyncPipeWriter::UniquePtr writer_;
115   TestReadCallback readCallback_;
116   TestWriteCallback writeCallback_;
117 };
118 
getBuf(const std::string & data)119 std::unique_ptr<folly::IOBuf> getBuf(const std::string& data) {
120   auto buf = folly::IOBuf::copyBuffer(data.c_str(), data.length());
121   return buf;
122 }
123 
124 } // namespace
125 
TEST_F(AsyncPipeTest,simple)126 TEST_F(AsyncPipeTest, simple) {
127   for (int pass = 0; pass < 2; ++pass) {
128     reset(pass % 2 != 0);
129     reader_->setReadCB(&readCallback_);
130     writer_->write(getBuf("hello"), &writeCallback_);
131     writer_->closeOnEmpty();
132     eventBase_.loop();
133     EXPECT_EQ(readCallback_.getData(), "hello");
134     EXPECT_FALSE(readCallback_.error_);
135     EXPECT_EQ(writeCallback_.writes_, 1);
136     EXPECT_FALSE(writeCallback_.error_);
137   }
138 }
139 
TEST_F(AsyncPipeTest,blocked_writes)140 TEST_F(AsyncPipeTest, blocked_writes) {
141   for (int pass = 0; pass < 2; ++pass) {
142     reset(pass % 2 != 0);
143     uint32_t writeAttempts = 0;
144     do {
145       ++writeAttempts;
146       writer_->write(getBuf("hello"), &writeCallback_);
147     } while (writeCallback_.writes_ == writeAttempts);
148     // there is one blocked write
149     writer_->closeOnEmpty();
150 
151     reader_->setReadCB(&readCallback_);
152 
153     eventBase_.loop();
154     std::string expected;
155     for (uint32_t i = 0; i < writeAttempts; i++) {
156       expected += "hello";
157     }
158     EXPECT_EQ(readCallback_.getData(), expected);
159     EXPECT_FALSE(readCallback_.error_);
160     EXPECT_EQ(writeCallback_.writes_, writeAttempts);
161     EXPECT_FALSE(writeCallback_.error_);
162   }
163 }
164 
TEST_F(AsyncPipeTest,writeOnClose)165 TEST_F(AsyncPipeTest, writeOnClose) {
166   for (int pass = 0; pass < 2; ++pass) {
167     reset(pass % 2 != 0);
168     reader_->setReadCB(&readCallback_);
169     writer_->write(getBuf("hello"), &writeCallback_);
170     writer_->closeOnEmpty();
171     writer_->write(getBuf("hello"), &writeCallback_);
172     eventBase_.loop();
173     EXPECT_EQ(readCallback_.getData(), "hello");
174     EXPECT_FALSE(readCallback_.error_);
175     EXPECT_EQ(writeCallback_.writes_, 1);
176     EXPECT_TRUE(writeCallback_.error_);
177   }
178 }
179