1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  *
7  */
8 
9 #pragma once
10 
11 #include <quic/api/QuicSocket.h>
12 
13 #include <folly/io/async/EventBase.h>
14 #include <quic/common/BufUtil.h>
15 
16 namespace quic {
17 namespace samples {
18 class EchoHandler : public quic::QuicSocket::ConnectionCallback,
19                     public quic::QuicSocket::ReadCallback,
20                     public quic::QuicSocket::WriteCallback {
21  public:
22   using StreamData = std::pair<BufQueue, bool>;
23 
EchoHandler(folly::EventBase * evbIn)24   explicit EchoHandler(folly::EventBase* evbIn) : evb(evbIn) {}
25 
setQuicSocket(std::shared_ptr<quic::QuicSocket> socket)26   void setQuicSocket(std::shared_ptr<quic::QuicSocket> socket) {
27     sock = socket;
28   }
29 
onNewBidirectionalStream(quic::StreamId id)30   void onNewBidirectionalStream(quic::StreamId id) noexcept override {
31     LOG(INFO) << "Got bidirectional stream id=" << id;
32     sock->setReadCallback(id, this);
33   }
34 
onNewUnidirectionalStream(quic::StreamId id)35   void onNewUnidirectionalStream(quic::StreamId id) noexcept override {
36     LOG(INFO) << "Got unidirectional stream id=" << id;
37     sock->setReadCallback(id, this);
38   }
39 
onStopSending(quic::StreamId id,quic::ApplicationErrorCode error)40   void onStopSending(
41       quic::StreamId id,
42       quic::ApplicationErrorCode error) noexcept override {
43     LOG(INFO) << "Got StopSending stream id=" << id << " error=" << error;
44   }
45 
onConnectionEnd()46   void onConnectionEnd() noexcept override {
47     LOG(INFO) << "Socket closed";
48   }
49 
onConnectionSetupError(std::pair<quic::QuicErrorCode,std::string> error)50   void onConnectionSetupError(
51       std::pair<quic::QuicErrorCode, std::string> error) noexcept override {
52     onConnectionError(std::move(error));
53   }
54 
onConnectionError(std::pair<quic::QuicErrorCode,std::string> error)55   void onConnectionError(
56       std::pair<quic::QuicErrorCode, std::string> error) noexcept override {
57     LOG(ERROR) << "Socket error=" << toString(error.first) << " "
58                << error.second;
59   }
60 
readAvailable(quic::StreamId id)61   void readAvailable(quic::StreamId id) noexcept override {
62     LOG(INFO) << "read available for stream id=" << id;
63 
64     auto res = sock->read(id, 0);
65     if (res.hasError()) {
66       LOG(ERROR) << "Got error=" << toString(res.error());
67       return;
68     }
69     if (input_.find(id) == input_.end()) {
70       input_.emplace(id, std::make_pair(BufQueue(), false));
71     }
72     quic::Buf data = std::move(res.value().first);
73     bool eof = res.value().second;
74     auto dataLen = (data ? data->computeChainDataLength() : 0);
75     LOG(INFO) << "Got len=" << dataLen << " eof=" << uint32_t(eof)
76               << " total=" << input_[id].first.chainLength() + dataLen
77               << " data="
78               << ((data) ? data->clone()->moveToFbString().toStdString()
79                          : std::string());
80     input_[id].first.append(std::move(data));
81     input_[id].second = eof;
82     if (eof) {
83       echo(id, input_[id]);
84     }
85   }
86 
readError(quic::StreamId id,std::pair<quic::QuicErrorCode,folly::Optional<folly::StringPiece>> error)87   void readError(
88       quic::StreamId id,
89       std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>>
90           error) noexcept override {
91     LOG(ERROR) << "Got read error on stream=" << id
92                << " error=" << toString(error);
93     // A read error only terminates the ingress portion of the stream state.
94     // Your application should probably terminate the egress portion via
95     // resetStream
96   }
97 
echo(quic::StreamId id,StreamData & data)98   void echo(quic::StreamId id, StreamData& data) {
99     if (!data.second) {
100       // only echo when eof is present
101       return;
102     }
103     auto echoedData = folly::IOBuf::copyBuffer("echo ");
104     echoedData->prependChain(data.first.move());
105     auto res = sock->writeChain(id, std::move(echoedData), true, nullptr);
106     if (res.hasError()) {
107       LOG(ERROR) << "write error=" << toString(res.error());
108     } else {
109       // echo is done, clear EOF
110       data.second = false;
111     }
112   }
113 
onStreamWriteReady(quic::StreamId id,uint64_t maxToSend)114   void onStreamWriteReady(quic::StreamId id, uint64_t maxToSend) noexcept
115       override {
116     LOG(INFO) << "socket is write ready with maxToSend=" << maxToSend;
117     echo(id, input_[id]);
118   }
119 
onStreamWriteError(quic::StreamId id,std::pair<quic::QuicErrorCode,folly::Optional<folly::StringPiece>> error)120   void onStreamWriteError(
121       quic::StreamId id,
122       std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>>
123           error) noexcept override {
124     LOG(ERROR) << "write error with stream=" << id
125                << " error=" << toString(error);
126   }
127 
getEventBase()128   folly::EventBase* getEventBase() {
129     return evb;
130   }
131 
132   folly::EventBase* evb;
133   std::shared_ptr<quic::QuicSocket> sock;
134 
135  private:
136   std::map<quic::StreamId, StreamData> input_;
137 };
138 } // namespace samples
139 } // namespace quic
140