1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under the BSD-style license found in the
6 * LICENSE file in the root directory of this source tree.
7 */
8
9 #include "ProxyHandler.h"
10
11 #include <folly/io/SocketOptionMap.h>
12 #include <folly/io/async/EventBaseManager.h>
13 #include <folly/portability/GFlags.h>
14 #include <proxygen/httpserver/RequestHandler.h>
15 #include <proxygen/httpserver/ResponseBuilder.h>
16 #include <proxygen/lib/http/session/HTTPUpstreamSession.h>
17 #include <proxygen/lib/utils/URL.h>
18
19 #include "ProxyStats.h"
20
21 using namespace proxygen;
22 using std::string;
23 using std::unique_ptr;
24
25 DEFINE_int32(proxy_connect_timeout, 1000, "connect timeout in milliseconds");
26
27 namespace {
28 static const uint32_t kMinReadSize = 1460;
29 static const uint32_t kMaxReadSize = 4000;
30
31 static const uint8_t READS_SHUTDOWN = 1;
32 static const uint8_t WRITES_SHUTDOWN = 2;
33 static const uint8_t CLOSED = READS_SHUTDOWN | WRITES_SHUTDOWN;
34 } // namespace
35
36 namespace ProxyService {
37
ProxyHandler(ProxyStats * stats,folly::HHWheelTimer * timer)38 ProxyHandler::ProxyHandler(ProxyStats* stats, folly::HHWheelTimer* timer)
39 : stats_(stats), connector_{this, timer}, serverHandler_(*this) {
40 }
41
~ProxyHandler()42 ProxyHandler::~ProxyHandler() {
43 VLOG(4) << "deleting ProxyHandler";
44 }
45
onRequest(std::unique_ptr<HTTPMessage> headers)46 void ProxyHandler::onRequest(std::unique_ptr<HTTPMessage> headers) noexcept {
47 // This HTTP proxy does not obey the rules in the spec, such as stripping
48 // hop-by-hop headers. Example only!
49
50 stats_->recordRequest();
51 request_ = std::move(headers);
52 proxygen::URL url(request_->getURL());
53
54 folly::SocketAddress addr;
55 try {
56 // Note, this does a synchronous DNS lookup which is bad in event driven
57 // code
58 addr.setFromHostPort(url.getHost(), url.getPort());
59 } catch (...) {
60 ResponseBuilder(downstream_)
61 .status(503, "Bad Gateway")
62 .body(folly::to<string>("Could not parse server from URL: ",
63 request_->getURL()))
64 .sendWithEOM();
65 return;
66 }
67
68 downstream_->pauseIngress();
69 LOG(INFO) << "Trying to connect to " << addr;
70 auto evb = folly::EventBaseManager::get()->getEventBase();
71 if (request_->getMethod() == HTTPMethod::CONNECT) {
72 upstreamSock_ = folly::AsyncSocket::newSocket(evb);
73 upstreamSock_->connect(this, addr, FLAGS_proxy_connect_timeout);
74 } else {
75 // A more sophisticated proxy would have a connection pool here
76 const folly::SocketOptionMap opts{{{SOL_SOCKET, SO_REUSEADDR}, 1}};
77 downstream_->pauseIngress();
78 connector_.connect(folly::EventBaseManager::get()->getEventBase(),
79 addr,
80 std::chrono::milliseconds(FLAGS_proxy_connect_timeout),
81 opts);
82 }
83 }
84
onBody(std::unique_ptr<folly::IOBuf> body)85 void ProxyHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept {
86 if (txn_) {
87 LOG(INFO) << "Forwarding " << ((body) ? body->computeChainDataLength() : 0)
88 << " body bytes to server";
89 txn_->sendBody(std::move(body));
90 } else if (upstreamSock_) {
91 upstreamEgressPaused_ = true;
92 upstreamSock_->writeChain(this, std::move(body));
93 if (upstreamEgressPaused_) {
94 downstreamIngressPaused_ = true;
95 onServerEgressPaused();
96 }
97 } else {
98 LOG(WARNING) << "Dropping " << ((body) ? body->computeChainDataLength() : 0)
99 << " body bytes to server";
100 }
101 }
102
onEOM()103 void ProxyHandler::onEOM() noexcept {
104 if (txn_) {
105 LOG(INFO) << "Forwarding client EOM to server";
106 txn_->sendEOM();
107 } else if (upstreamSock_) {
108 LOG(INFO) << "Closing upgraded socket";
109 sockStatus_ |= WRITES_SHUTDOWN;
110 upstreamSock_->shutdownWrite();
111 } else {
112 LOG(INFO) << "Dropping client EOM to server";
113 }
114 }
115
connectSuccess(HTTPUpstreamSession * session)116 void ProxyHandler::connectSuccess(HTTPUpstreamSession* session) {
117 LOG(INFO) << "Established " << *session;
118 session_ = std::make_unique<SessionWrapper>(session);
119 txn_ = session->newTransaction(&serverHandler_);
120 LOG(INFO) << "Forwarding client request: " << request_->getURL()
121 << " to server";
122 txn_->sendHeaders(*request_);
123 downstream_->resumeIngress();
124 }
125
connectError(const folly::AsyncSocketException & ex)126 void ProxyHandler::connectError(const folly::AsyncSocketException& ex) {
127 LOG(ERROR) << "Failed to connect: " << folly::exceptionStr(ex);
128 if (!clientTerminated_) {
129 ResponseBuilder(downstream_).status(503, "Bad Gateway").sendWithEOM();
130 } else {
131 abortDownstream();
132 checkForShutdown();
133 }
134 }
135
onServerHeadersComplete(unique_ptr<HTTPMessage> msg)136 void ProxyHandler::onServerHeadersComplete(
137 unique_ptr<HTTPMessage> msg) noexcept {
138 CHECK(!clientTerminated_);
139 LOG(INFO) << "Forwarding " << msg->getStatusCode() << " response to client";
140 downstream_->sendHeaders(*msg);
141 }
142
onServerBody(std::unique_ptr<folly::IOBuf> chain)143 void ProxyHandler::onServerBody(std::unique_ptr<folly::IOBuf> chain) noexcept {
144 CHECK(!clientTerminated_);
145 LOG(INFO) << "Forwarding " << ((chain) ? chain->computeChainDataLength() : 0)
146 << " body bytes to client";
147 downstream_->sendBody(std::move(chain));
148 }
149
onServerEOM()150 void ProxyHandler::onServerEOM() noexcept {
151 if (!clientTerminated_) {
152 LOG(INFO) << "Forwarding server EOM to client";
153 downstream_->sendEOM();
154 }
155 }
156
detachServerTransaction()157 void ProxyHandler::detachServerTransaction() noexcept {
158 txn_ = nullptr;
159 checkForShutdown();
160 }
161
onServerError(const HTTPException & error)162 void ProxyHandler::onServerError(const HTTPException& error) noexcept {
163 LOG(ERROR) << "Server error: " << error;
164 abortDownstream();
165 }
166
onServerEgressPaused()167 void ProxyHandler::onServerEgressPaused() noexcept {
168 if (!clientTerminated_) {
169 downstream_->pauseIngress();
170 }
171 }
172
onServerEgressResumed()173 void ProxyHandler::onServerEgressResumed() noexcept {
174 if (!clientTerminated_) {
175 downstream_->resumeIngress();
176 }
177 }
178
requestComplete()179 void ProxyHandler::requestComplete() noexcept {
180 clientTerminated_ = true;
181 checkForShutdown();
182 }
183
onError(ProxygenError err)184 void ProxyHandler::onError(ProxygenError err) noexcept {
185 LOG(ERROR) << "Client error: " << proxygen::getErrorString(err);
186 clientTerminated_ = true;
187 if (txn_) {
188 LOG(ERROR) << "Aborting server txn: " << *txn_;
189 txn_->sendAbort();
190 } else if (upstreamSock_) {
191 upstreamSock_.reset();
192 }
193 checkForShutdown();
194 }
195
onEgressPaused()196 void ProxyHandler::onEgressPaused() noexcept {
197 if (txn_) {
198 txn_->pauseIngress();
199 } else if (upstreamSock_) {
200 upstreamSock_->setReadCB(nullptr);
201 }
202 }
203
onEgressResumed()204 void ProxyHandler::onEgressResumed() noexcept {
205 if (txn_) {
206 txn_->resumeIngress();
207 } else if (upstreamSock_) {
208 upstreamSock_->setReadCB(this);
209 }
210 }
211
abortDownstream()212 void ProxyHandler::abortDownstream() {
213 if (!clientTerminated_) {
214 downstream_->sendAbort();
215 }
216 }
217
checkForShutdown()218 bool ProxyHandler::checkForShutdown() {
219 if (clientTerminated_ && !txn_ &&
220 (!upstreamSock_ || (sockStatus_ == CLOSED && !upstreamEgressPaused_))) {
221 delete this;
222 return true;
223 }
224 return false;
225 }
226
connectSuccess()227 void ProxyHandler::connectSuccess() noexcept {
228 LOG(INFO) << "Connected to upstream " << upstreamSock_;
229 ResponseBuilder(downstream_).status(200, "OK").send();
230 upstreamSock_->setReadCB(this);
231 downstream_->resumeIngress();
232 }
233
connectErr(const folly::AsyncSocketException & ex)234 void ProxyHandler::connectErr(const folly::AsyncSocketException& ex) noexcept {
235 connectError(ex);
236 }
237
getReadBuffer(void ** bufReturn,size_t * lenReturn)238 void ProxyHandler::getReadBuffer(void** bufReturn, size_t* lenReturn) {
239 std::pair<void*, uint32_t> readSpace =
240 body_.preallocate(kMinReadSize, kMaxReadSize);
241 *bufReturn = readSpace.first;
242 *lenReturn = readSpace.second;
243 }
244
readDataAvailable(size_t len)245 void ProxyHandler::readDataAvailable(size_t len) noexcept {
246 body_.postallocate(len);
247 downstream_->sendBody(body_.move());
248 }
249
readEOF()250 void ProxyHandler::readEOF() noexcept {
251 sockStatus_ |= READS_SHUTDOWN;
252 onServerEOM();
253 }
254
readErr(const folly::AsyncSocketException & ex)255 void ProxyHandler::readErr(const folly::AsyncSocketException& ex) noexcept {
256 LOG(ERROR) << "Server read error: " << folly::exceptionStr(ex);
257 abortDownstream();
258 upstreamSock_.reset();
259 checkForShutdown();
260 }
261
writeSuccess()262 void ProxyHandler::writeSuccess() noexcept {
263 upstreamEgressPaused_ = false;
264 if (downstreamIngressPaused_) {
265 downstreamIngressPaused_ = false;
266 onServerEgressResumed();
267 }
268 checkForShutdown();
269 }
270
writeErr(size_t,const folly::AsyncSocketException & ex)271 void ProxyHandler::writeErr(size_t /*bytesWritten*/,
272 const folly::AsyncSocketException& ex) noexcept {
273 LOG(ERROR) << "Server write error: " << folly::exceptionStr(ex);
274 ;
275 upstreamEgressPaused_ = false;
276 abortDownstream();
277 upstreamSock_.reset();
278 checkForShutdown();
279 }
280
281 } // namespace ProxyService
282