1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <chrono>
20 #include <future>
21 #include <memory>
22 #include <string>
23 
24 #include <folly/Try.h>
25 #include <folly/io/async/AsyncServerSocket.h>
26 #include <folly/io/async/ScopedEventBaseThread.h>
27 
28 #include <thrift/lib/cpp2/async/ServerStream.h>
29 #include <thrift/lib/cpp2/transport/rocket/Types.h>
30 #include <thrift/lib/cpp2/transport/rocket/client/RocketClient.h>
31 #include <thrift/lib/cpp2/transport/rocket/framing/Frames.h>
32 #include <thrift/lib/thrift/gen-cpp2/RpcMetadata_types.h>
33 
34 namespace folly {
35 class EventBase;
36 class IOBuf;
37 class SocketAddress;
38 
39 namespace fibers {
40 class FiberManager;
41 } // namespace fibers
42 } // namespace folly
43 
44 namespace wangle {
45 class Acceptor;
46 } // namespace wangle
47 
48 namespace apache {
49 namespace thrift {
50 class RequestClientCallback;
51 class StreamClientCallback;
52 class ChannelClientCallback;
53 class SinkClientCallback;
54 
55 namespace rocket {
56 namespace test {
57 
58 class RocketTestClient {
59  public:
60   explicit RocketTestClient(const folly::SocketAddress& serverAddr);
61   ~RocketTestClient();
62 
63   folly::Try<Payload> sendRequestResponseSync(
64       Payload request,
65       std::chrono::milliseconds timeout = std::chrono::milliseconds(250),
66       RocketClient::WriteSuccessCallback* writeSuccessCallback = nullptr);
67 
68   folly::Try<void> sendRequestFnfSync(Payload request);
69 
70   folly::Try<ClientBufferedStream<Payload>> sendRequestStreamSync(
71       Payload request);
72   void sendRequestSink(SinkClientCallback* callback, Payload request);
73 
74   rocket::SetupFrame makeTestSetupFrame(
75       MetadataOpaqueMap<std::string, std::string> md =
76           MetadataOpaqueMap<std::string, std::string>{
77               {"rando_key", "setup_data"}});
78 
79   void reconnect();
80   void connect();
81   void disconnect();
82 
getRawClient()83   RocketClient& getRawClient() { return *client_; }
84 
getEventBase()85   folly::EventBase& getEventBase() { return evb_; }
86 
87   void verifyVersion();
88 
89  private:
90   folly::ScopedEventBaseThread evbThread_;
91   folly::EventBase& evb_;
92   folly::fibers::FiberManager& fm_;
93   std::unique_ptr<RocketClient, folly::DelayedDestruction::Destructor> client_;
94   const folly::SocketAddress serverAddr_;
95 };
96 
97 class RocketTestServer {
98  public:
99   RocketTestServer();
100   ~RocketTestServer();
101 
102   uint16_t getListeningPort() const;
103   void setExpectedRemainingStreams(size_t n);
104 
105   void setExpectedSetupMetadata(MetadataOpaqueMap<std::string, std::string> md);
106 
107  private:
108   class RocketTestServerHandler;
109 
110   folly::ScopedEventBaseThread ioThread_;
111   folly::EventBase& evb_;
112   folly::AsyncServerSocket::UniquePtr listeningSocket_;
113   MetadataOpaqueMap<std::string, std::string> expectedSetupMetadata_{
114       {"rando_key", "setup_data"}};
115   std::unique_ptr<wangle::Acceptor> acceptor_;
116   std::future<void> shutdownFuture_;
117 
118   void start();
119   void stop();
120 };
121 
122 } // namespace test
123 } // namespace rocket
124 } // namespace thrift
125 } // namespace apache
126