1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * This source code is licensed under the MIT license found in the 5 * LICENSE file in the root directory of this source tree. 6 * 7 */ 8 9 #pragma once 10 11 #include <quic/api/QuicSocket.h> 12 13 #include <folly/io/async/EventBase.h> 14 #include <quic/common/BufUtil.h> 15 16 namespace quic { 17 namespace samples { 18 class EchoHandler : public quic::QuicSocket::ConnectionCallback, 19 public quic::QuicSocket::ReadCallback, 20 public quic::QuicSocket::WriteCallback { 21 public: 22 using StreamData = std::pair<BufQueue, bool>; 23 EchoHandler(folly::EventBase * evbIn)24 explicit EchoHandler(folly::EventBase* evbIn) : evb(evbIn) {} 25 setQuicSocket(std::shared_ptr<quic::QuicSocket> socket)26 void setQuicSocket(std::shared_ptr<quic::QuicSocket> socket) { 27 sock = socket; 28 } 29 onNewBidirectionalStream(quic::StreamId id)30 void onNewBidirectionalStream(quic::StreamId id) noexcept override { 31 LOG(INFO) << "Got bidirectional stream id=" << id; 32 sock->setReadCallback(id, this); 33 } 34 onNewUnidirectionalStream(quic::StreamId id)35 void onNewUnidirectionalStream(quic::StreamId id) noexcept override { 36 LOG(INFO) << "Got unidirectional stream id=" << id; 37 sock->setReadCallback(id, this); 38 } 39 onStopSending(quic::StreamId id,quic::ApplicationErrorCode error)40 void onStopSending( 41 quic::StreamId id, 42 quic::ApplicationErrorCode error) noexcept override { 43 LOG(INFO) << "Got StopSending stream id=" << id << " error=" << error; 44 } 45 onConnectionEnd()46 void onConnectionEnd() noexcept override { 47 LOG(INFO) << "Socket closed"; 48 } 49 onConnectionSetupError(std::pair<quic::QuicErrorCode,std::string> error)50 void onConnectionSetupError( 51 std::pair<quic::QuicErrorCode, std::string> error) noexcept override { 52 onConnectionError(std::move(error)); 53 } 54 onConnectionError(std::pair<quic::QuicErrorCode,std::string> error)55 void onConnectionError( 56 std::pair<quic::QuicErrorCode, std::string> error) noexcept override { 57 LOG(ERROR) << "Socket error=" << toString(error.first) << " " 58 << error.second; 59 } 60 readAvailable(quic::StreamId id)61 void readAvailable(quic::StreamId id) noexcept override { 62 LOG(INFO) << "read available for stream id=" << id; 63 64 auto res = sock->read(id, 0); 65 if (res.hasError()) { 66 LOG(ERROR) << "Got error=" << toString(res.error()); 67 return; 68 } 69 if (input_.find(id) == input_.end()) { 70 input_.emplace(id, std::make_pair(BufQueue(), false)); 71 } 72 quic::Buf data = std::move(res.value().first); 73 bool eof = res.value().second; 74 auto dataLen = (data ? data->computeChainDataLength() : 0); 75 LOG(INFO) << "Got len=" << dataLen << " eof=" << uint32_t(eof) 76 << " total=" << input_[id].first.chainLength() + dataLen 77 << " data=" 78 << ((data) ? data->clone()->moveToFbString().toStdString() 79 : std::string()); 80 input_[id].first.append(std::move(data)); 81 input_[id].second = eof; 82 if (eof) { 83 echo(id, input_[id]); 84 } 85 } 86 readError(quic::StreamId id,std::pair<quic::QuicErrorCode,folly::Optional<folly::StringPiece>> error)87 void readError( 88 quic::StreamId id, 89 std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>> 90 error) noexcept override { 91 LOG(ERROR) << "Got read error on stream=" << id 92 << " error=" << toString(error); 93 // A read error only terminates the ingress portion of the stream state. 94 // Your application should probably terminate the egress portion via 95 // resetStream 96 } 97 echo(quic::StreamId id,StreamData & data)98 void echo(quic::StreamId id, StreamData& data) { 99 if (!data.second) { 100 // only echo when eof is present 101 return; 102 } 103 auto echoedData = folly::IOBuf::copyBuffer("echo "); 104 echoedData->prependChain(data.first.move()); 105 auto res = sock->writeChain(id, std::move(echoedData), true, nullptr); 106 if (res.hasError()) { 107 LOG(ERROR) << "write error=" << toString(res.error()); 108 } else { 109 // echo is done, clear EOF 110 data.second = false; 111 } 112 } 113 onStreamWriteReady(quic::StreamId id,uint64_t maxToSend)114 void onStreamWriteReady(quic::StreamId id, uint64_t maxToSend) noexcept 115 override { 116 LOG(INFO) << "socket is write ready with maxToSend=" << maxToSend; 117 echo(id, input_[id]); 118 } 119 onStreamWriteError(quic::StreamId id,std::pair<quic::QuicErrorCode,folly::Optional<folly::StringPiece>> error)120 void onStreamWriteError( 121 quic::StreamId id, 122 std::pair<quic::QuicErrorCode, folly::Optional<folly::StringPiece>> 123 error) noexcept override { 124 LOG(ERROR) << "write error with stream=" << id 125 << " error=" << toString(error); 126 } 127 getEventBase()128 folly::EventBase* getEventBase() { 129 return evb; 130 } 131 132 folly::EventBase* evb; 133 std::shared_ptr<quic::QuicSocket> sock; 134 135 private: 136 std::map<quic::StreamId, StreamData> input_; 137 }; 138 } // namespace samples 139 } // namespace quic 140