1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * LICENSE file in the root directory of this source tree.
7  */
8 
9 #include "ProxyHandler.h"
10 
11 #include <folly/io/SocketOptionMap.h>
12 #include <folly/io/async/EventBaseManager.h>
13 #include <folly/portability/GFlags.h>
14 #include <proxygen/httpserver/RequestHandler.h>
15 #include <proxygen/httpserver/ResponseBuilder.h>
16 #include <proxygen/lib/http/session/HTTPUpstreamSession.h>
17 #include <proxygen/lib/utils/URL.h>
18 
19 #include "ProxyStats.h"
20 
21 using namespace proxygen;
22 using std::string;
23 using std::unique_ptr;
24 
25 DEFINE_int32(proxy_connect_timeout, 1000, "connect timeout in milliseconds");
26 
27 namespace {
28 static const uint32_t kMinReadSize = 1460;
29 static const uint32_t kMaxReadSize = 4000;
30 
31 static const uint8_t READS_SHUTDOWN = 1;
32 static const uint8_t WRITES_SHUTDOWN = 2;
33 static const uint8_t CLOSED = READS_SHUTDOWN | WRITES_SHUTDOWN;
34 } // namespace
35 
36 namespace ProxyService {
37 
ProxyHandler(ProxyStats * stats,folly::HHWheelTimer * timer)38 ProxyHandler::ProxyHandler(ProxyStats* stats, folly::HHWheelTimer* timer)
39     : stats_(stats), connector_{this, timer}, serverHandler_(*this) {
40 }
41 
~ProxyHandler()42 ProxyHandler::~ProxyHandler() {
43   VLOG(4) << "deleting ProxyHandler";
44 }
45 
onRequest(std::unique_ptr<HTTPMessage> headers)46 void ProxyHandler::onRequest(std::unique_ptr<HTTPMessage> headers) noexcept {
47   // This HTTP proxy does not obey the rules in the spec, such as stripping
48   // hop-by-hop headers.  Example only!
49 
50   stats_->recordRequest();
51   request_ = std::move(headers);
52   proxygen::URL url(request_->getURL());
53 
54   folly::SocketAddress addr;
55   try {
56     // Note, this does a synchronous DNS lookup which is bad in event driven
57     // code
58     addr.setFromHostPort(url.getHost(), url.getPort());
59   } catch (...) {
60     ResponseBuilder(downstream_)
61         .status(503, "Bad Gateway")
62         .body(folly::to<string>("Could not parse server from URL: ",
63                                 request_->getURL()))
64         .sendWithEOM();
65     return;
66   }
67 
68   downstream_->pauseIngress();
69   LOG(INFO) << "Trying to connect to " << addr;
70   auto evb = folly::EventBaseManager::get()->getEventBase();
71   if (request_->getMethod() == HTTPMethod::CONNECT) {
72     upstreamSock_ = folly::AsyncSocket::newSocket(evb);
73     upstreamSock_->connect(this, addr, FLAGS_proxy_connect_timeout);
74   } else {
75     // A more sophisticated proxy would have a connection pool here
76     const folly::SocketOptionMap opts{{{SOL_SOCKET, SO_REUSEADDR}, 1}};
77     downstream_->pauseIngress();
78     connector_.connect(folly::EventBaseManager::get()->getEventBase(),
79                        addr,
80                        std::chrono::milliseconds(FLAGS_proxy_connect_timeout),
81                        opts);
82   }
83 }
84 
onBody(std::unique_ptr<folly::IOBuf> body)85 void ProxyHandler::onBody(std::unique_ptr<folly::IOBuf> body) noexcept {
86   if (txn_) {
87     LOG(INFO) << "Forwarding " << ((body) ? body->computeChainDataLength() : 0)
88               << " body bytes to server";
89     txn_->sendBody(std::move(body));
90   } else if (upstreamSock_) {
91     upstreamEgressPaused_ = true;
92     upstreamSock_->writeChain(this, std::move(body));
93     if (upstreamEgressPaused_) {
94       downstreamIngressPaused_ = true;
95       onServerEgressPaused();
96     }
97   } else {
98     LOG(WARNING) << "Dropping " << ((body) ? body->computeChainDataLength() : 0)
99                  << " body bytes to server";
100   }
101 }
102 
onEOM()103 void ProxyHandler::onEOM() noexcept {
104   if (txn_) {
105     LOG(INFO) << "Forwarding client EOM to server";
106     txn_->sendEOM();
107   } else if (upstreamSock_) {
108     LOG(INFO) << "Closing upgraded socket";
109     sockStatus_ |= WRITES_SHUTDOWN;
110     upstreamSock_->shutdownWrite();
111   } else {
112     LOG(INFO) << "Dropping client EOM to server";
113   }
114 }
115 
connectSuccess(HTTPUpstreamSession * session)116 void ProxyHandler::connectSuccess(HTTPUpstreamSession* session) {
117   LOG(INFO) << "Established " << *session;
118   session_ = std::make_unique<SessionWrapper>(session);
119   txn_ = session->newTransaction(&serverHandler_);
120   LOG(INFO) << "Forwarding client request: " << request_->getURL()
121             << " to server";
122   txn_->sendHeaders(*request_);
123   downstream_->resumeIngress();
124 }
125 
connectError(const folly::AsyncSocketException & ex)126 void ProxyHandler::connectError(const folly::AsyncSocketException& ex) {
127   LOG(ERROR) << "Failed to connect: " << folly::exceptionStr(ex);
128   if (!clientTerminated_) {
129     ResponseBuilder(downstream_).status(503, "Bad Gateway").sendWithEOM();
130   } else {
131     abortDownstream();
132     checkForShutdown();
133   }
134 }
135 
onServerHeadersComplete(unique_ptr<HTTPMessage> msg)136 void ProxyHandler::onServerHeadersComplete(
137     unique_ptr<HTTPMessage> msg) noexcept {
138   CHECK(!clientTerminated_);
139   LOG(INFO) << "Forwarding " << msg->getStatusCode() << " response to client";
140   downstream_->sendHeaders(*msg);
141 }
142 
onServerBody(std::unique_ptr<folly::IOBuf> chain)143 void ProxyHandler::onServerBody(std::unique_ptr<folly::IOBuf> chain) noexcept {
144   CHECK(!clientTerminated_);
145   LOG(INFO) << "Forwarding " << ((chain) ? chain->computeChainDataLength() : 0)
146             << " body bytes to client";
147   downstream_->sendBody(std::move(chain));
148 }
149 
onServerEOM()150 void ProxyHandler::onServerEOM() noexcept {
151   if (!clientTerminated_) {
152     LOG(INFO) << "Forwarding server EOM to client";
153     downstream_->sendEOM();
154   }
155 }
156 
detachServerTransaction()157 void ProxyHandler::detachServerTransaction() noexcept {
158   txn_ = nullptr;
159   checkForShutdown();
160 }
161 
onServerError(const HTTPException & error)162 void ProxyHandler::onServerError(const HTTPException& error) noexcept {
163   LOG(ERROR) << "Server error: " << error;
164   abortDownstream();
165 }
166 
onServerEgressPaused()167 void ProxyHandler::onServerEgressPaused() noexcept {
168   if (!clientTerminated_) {
169     downstream_->pauseIngress();
170   }
171 }
172 
onServerEgressResumed()173 void ProxyHandler::onServerEgressResumed() noexcept {
174   if (!clientTerminated_) {
175     downstream_->resumeIngress();
176   }
177 }
178 
requestComplete()179 void ProxyHandler::requestComplete() noexcept {
180   clientTerminated_ = true;
181   checkForShutdown();
182 }
183 
onError(ProxygenError err)184 void ProxyHandler::onError(ProxygenError err) noexcept {
185   LOG(ERROR) << "Client error: " << proxygen::getErrorString(err);
186   clientTerminated_ = true;
187   if (txn_) {
188     LOG(ERROR) << "Aborting server txn: " << *txn_;
189     txn_->sendAbort();
190   } else if (upstreamSock_) {
191     upstreamSock_.reset();
192   }
193   checkForShutdown();
194 }
195 
onEgressPaused()196 void ProxyHandler::onEgressPaused() noexcept {
197   if (txn_) {
198     txn_->pauseIngress();
199   } else if (upstreamSock_) {
200     upstreamSock_->setReadCB(nullptr);
201   }
202 }
203 
onEgressResumed()204 void ProxyHandler::onEgressResumed() noexcept {
205   if (txn_) {
206     txn_->resumeIngress();
207   } else if (upstreamSock_) {
208     upstreamSock_->setReadCB(this);
209   }
210 }
211 
abortDownstream()212 void ProxyHandler::abortDownstream() {
213   if (!clientTerminated_) {
214     downstream_->sendAbort();
215   }
216 }
217 
checkForShutdown()218 bool ProxyHandler::checkForShutdown() {
219   if (clientTerminated_ && !txn_ &&
220       (!upstreamSock_ || (sockStatus_ == CLOSED && !upstreamEgressPaused_))) {
221     delete this;
222     return true;
223   }
224   return false;
225 }
226 
connectSuccess()227 void ProxyHandler::connectSuccess() noexcept {
228   LOG(INFO) << "Connected to upstream " << upstreamSock_;
229   ResponseBuilder(downstream_).status(200, "OK").send();
230   upstreamSock_->setReadCB(this);
231   downstream_->resumeIngress();
232 }
233 
connectErr(const folly::AsyncSocketException & ex)234 void ProxyHandler::connectErr(const folly::AsyncSocketException& ex) noexcept {
235   connectError(ex);
236 }
237 
getReadBuffer(void ** bufReturn,size_t * lenReturn)238 void ProxyHandler::getReadBuffer(void** bufReturn, size_t* lenReturn) {
239   std::pair<void*, uint32_t> readSpace =
240       body_.preallocate(kMinReadSize, kMaxReadSize);
241   *bufReturn = readSpace.first;
242   *lenReturn = readSpace.second;
243 }
244 
readDataAvailable(size_t len)245 void ProxyHandler::readDataAvailable(size_t len) noexcept {
246   body_.postallocate(len);
247   downstream_->sendBody(body_.move());
248 }
249 
readEOF()250 void ProxyHandler::readEOF() noexcept {
251   sockStatus_ |= READS_SHUTDOWN;
252   onServerEOM();
253 }
254 
readErr(const folly::AsyncSocketException & ex)255 void ProxyHandler::readErr(const folly::AsyncSocketException& ex) noexcept {
256   LOG(ERROR) << "Server read error: " << folly::exceptionStr(ex);
257   abortDownstream();
258   upstreamSock_.reset();
259   checkForShutdown();
260 }
261 
writeSuccess()262 void ProxyHandler::writeSuccess() noexcept {
263   upstreamEgressPaused_ = false;
264   if (downstreamIngressPaused_) {
265     downstreamIngressPaused_ = false;
266     onServerEgressResumed();
267   }
268   checkForShutdown();
269 }
270 
writeErr(size_t,const folly::AsyncSocketException & ex)271 void ProxyHandler::writeErr(size_t /*bytesWritten*/,
272                             const folly::AsyncSocketException& ex) noexcept {
273   LOG(ERROR) << "Server write error: " << folly::exceptionStr(ex);
274   ;
275   upstreamEgressPaused_ = false;
276   abortDownstream();
277   upstreamSock_.reset();
278   checkForShutdown();
279 }
280 
281 } // namespace ProxyService
282