1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 // Interfaces to use for defining Flight RPC servers. API should be considered 19 // experimental for now 20 21 #pragma once 22 23 #include <functional> 24 #include <memory> 25 #include <string> 26 #include <utility> 27 #include <vector> 28 29 #include "arrow/flight/server_auth.h" 30 #include "arrow/flight/types.h" // IWYU pragma: keep 31 #include "arrow/flight/visibility.h" // IWYU pragma: keep 32 #include "arrow/ipc/dictionary.h" 33 #include "arrow/ipc/options.h" 34 #include "arrow/record_batch.h" 35 36 namespace arrow { 37 38 class Schema; 39 class Status; 40 41 namespace flight { 42 43 class ServerMiddleware; 44 class ServerMiddlewareFactory; 45 46 /// \brief Interface that produces a sequence of IPC payloads to be sent in 47 /// FlightData protobuf messages 48 class ARROW_FLIGHT_EXPORT FlightDataStream { 49 public: 50 virtual ~FlightDataStream(); 51 52 virtual std::shared_ptr<Schema> schema() = 0; 53 54 /// \brief Compute FlightPayload containing serialized RecordBatch schema 55 virtual Status GetSchemaPayload(FlightPayload* payload) = 0; 56 57 // When the stream is completed, the last payload written will have null 58 // metadata 59 virtual Status Next(FlightPayload* payload) = 0; 60 }; 61 62 /// \brief A basic implementation of FlightDataStream that will provide 63 /// a sequence of FlightData messages to be written to a gRPC stream 64 class ARROW_FLIGHT_EXPORT RecordBatchStream : public FlightDataStream { 65 public: 66 /// \param[in] reader produces a sequence of record batches 67 /// \param[in] options IPC options for writing 68 explicit RecordBatchStream( 69 const std::shared_ptr<RecordBatchReader>& reader, 70 const ipc::IpcWriteOptions& options = ipc::IpcWriteOptions::Defaults()); 71 ~RecordBatchStream() override; 72 73 std::shared_ptr<Schema> schema() override; 74 Status GetSchemaPayload(FlightPayload* payload) override; 75 Status Next(FlightPayload* payload) override; 76 77 private: 78 class RecordBatchStreamImpl; 79 std::unique_ptr<RecordBatchStreamImpl> impl_; 80 }; 81 82 /// \brief A reader for IPC payloads uploaded by a client. Also allows 83 /// reading application-defined metadata via the Flight protocol. 84 class ARROW_FLIGHT_EXPORT FlightMessageReader : public MetadataRecordBatchReader { 85 public: 86 /// \brief Get the descriptor for this upload. 87 virtual const FlightDescriptor& descriptor() const = 0; 88 }; 89 90 /// \brief A writer for application-specific metadata sent back to the 91 /// client during an upload. 92 class ARROW_FLIGHT_EXPORT FlightMetadataWriter { 93 public: 94 virtual ~FlightMetadataWriter(); 95 /// \brief Send a message to the client. 96 virtual Status WriteMetadata(const Buffer& app_metadata) = 0; 97 }; 98 99 /// \brief A writer for IPC payloads to a client. Also allows sending 100 /// application-defined metadata via the Flight protocol. 101 /// 102 /// This class offers more control compared to FlightDataStream, 103 /// including the option to write metadata without data and the 104 /// ability to interleave reading and writing. 105 class ARROW_FLIGHT_EXPORT FlightMessageWriter : public MetadataRecordBatchWriter { 106 public: 107 virtual ~FlightMessageWriter() = default; 108 }; 109 110 /// \brief Call state/contextual data. 111 class ARROW_FLIGHT_EXPORT ServerCallContext { 112 public: 113 virtual ~ServerCallContext() = default; 114 /// \brief The name of the authenticated peer (may be the empty string) 115 virtual const std::string& peer_identity() const = 0; 116 /// \brief The peer address (not validated) 117 virtual const std::string& peer() const = 0; 118 /// \brief Look up a middleware by key. Do not maintain a reference 119 /// to the object beyond the request body. 120 /// \return The middleware, or nullptr if not found. 121 virtual ServerMiddleware* GetMiddleware(const std::string& key) const = 0; 122 /// \brief Check if the current RPC has been cancelled (by the client, by 123 /// a network error, etc.). 124 virtual bool is_cancelled() const = 0; 125 }; 126 127 class ARROW_FLIGHT_EXPORT FlightServerOptions { 128 public: 129 explicit FlightServerOptions(const Location& location_); 130 131 ~FlightServerOptions(); 132 133 /// \brief The host & port (or domain socket path) to listen on. 134 /// Use port 0 to bind to an available port. 135 Location location; 136 /// \brief The authentication handler to use. 137 std::shared_ptr<ServerAuthHandler> auth_handler; 138 /// \brief A list of TLS certificate+key pairs to use. 139 std::vector<CertKeyPair> tls_certificates; 140 /// \brief Enable mTLS and require that the client present a certificate. 141 bool verify_client; 142 /// \brief If using mTLS, the PEM-encoded root certificate to use. 143 std::string root_certificates; 144 /// \brief A list of server middleware to apply, along with a key to 145 /// identify them by. 146 /// 147 /// Middleware are always applied in the order provided. Duplicate 148 /// keys are an error. 149 std::vector<std::pair<std::string, std::shared_ptr<ServerMiddlewareFactory>>> 150 middleware; 151 152 /// \brief A Flight implementation-specific callback to customize 153 /// transport-specific options. 154 /// 155 /// Not guaranteed to be called. The type of the parameter is 156 /// specific to the Flight implementation. Users should take care to 157 /// link to the same transport implementation as Flight to avoid 158 /// runtime problems. 159 std::function<void(void*)> builder_hook; 160 }; 161 162 /// \brief Skeleton RPC server implementation which can be used to create 163 /// custom servers by implementing its abstract methods 164 class ARROW_FLIGHT_EXPORT FlightServerBase { 165 public: 166 FlightServerBase(); 167 virtual ~FlightServerBase(); 168 169 // Lifecycle methods. 170 171 /// \brief Initialize a Flight server listening at the given location. 172 /// This method must be called before any other method. 173 /// \param[in] options The configuration for this server. 174 Status Init(const FlightServerOptions& options); 175 176 /// \brief Get the port that the Flight server is listening on. 177 /// This method must only be called after Init(). Will return a 178 /// non-positive value if no port exists (e.g. when listening on a 179 /// domain socket). 180 int port() const; 181 182 /// \brief Set the server to stop when receiving any of the given signal 183 /// numbers. 184 /// This method must be called before Serve(). 185 Status SetShutdownOnSignals(const std::vector<int> sigs); 186 187 /// \brief Start serving. 188 /// This method blocks until either Shutdown() is called or one of the signals 189 /// registered in SetShutdownOnSignals() is received. 190 Status Serve(); 191 192 /// \brief Query whether Serve() was interrupted by a signal. 193 /// This method must be called after Serve() has returned. 194 /// 195 /// \return int the signal number that interrupted Serve(), if any, otherwise 0 196 int GotSignal() const; 197 198 /// \brief Shut down the server. Can be called from signal handler or another 199 /// thread while Serve() blocks. 200 /// 201 /// TODO(wesm): Shutdown with deadline 202 Status Shutdown(); 203 204 /// \brief Block until server is terminated with Shutdown. 205 Status Wait(); 206 207 // Implement these methods to create your own server. The default 208 // implementations will return a not-implemented result to the client 209 210 /// \brief Retrieve a list of available fields given an optional opaque 211 /// criteria 212 /// \param[in] context The call context. 213 /// \param[in] criteria may be null 214 /// \param[out] listings the returned listings iterator 215 /// \return Status 216 virtual Status ListFlights(const ServerCallContext& context, const Criteria* criteria, 217 std::unique_ptr<FlightListing>* listings); 218 219 /// \brief Retrieve the schema and an access plan for the indicated 220 /// descriptor 221 /// \param[in] context The call context. 222 /// \param[in] request may be null 223 /// \param[out] info the returned flight info provider 224 /// \return Status 225 virtual Status GetFlightInfo(const ServerCallContext& context, 226 const FlightDescriptor& request, 227 std::unique_ptr<FlightInfo>* info); 228 229 /// \brief Retrieve the schema for the indicated descriptor 230 /// \param[in] context The call context. 231 /// \param[in] request may be null 232 /// \param[out] schema the returned flight schema provider 233 /// \return Status 234 virtual Status GetSchema(const ServerCallContext& context, 235 const FlightDescriptor& request, 236 std::unique_ptr<SchemaResult>* schema); 237 238 /// \brief Get a stream of IPC payloads to put on the wire 239 /// \param[in] context The call context. 240 /// \param[in] request an opaque ticket 241 /// \param[out] stream the returned stream provider 242 /// \return Status 243 virtual Status DoGet(const ServerCallContext& context, const Ticket& request, 244 std::unique_ptr<FlightDataStream>* stream); 245 246 /// \brief Process a stream of IPC payloads sent from a client 247 /// \param[in] context The call context. 248 /// \param[in] reader a sequence of uploaded record batches 249 /// \param[in] writer send metadata back to the client 250 /// \return Status 251 virtual Status DoPut(const ServerCallContext& context, 252 std::unique_ptr<FlightMessageReader> reader, 253 std::unique_ptr<FlightMetadataWriter> writer); 254 255 /// \brief Process a bidirectional stream of IPC payloads 256 /// \param[in] context The call context. 257 /// \param[in] reader a sequence of uploaded record batches 258 /// \param[in] writer send data back to the client 259 /// \return Status 260 virtual Status DoExchange(const ServerCallContext& context, 261 std::unique_ptr<FlightMessageReader> reader, 262 std::unique_ptr<FlightMessageWriter> writer); 263 264 /// \brief Execute an action, return stream of zero or more results 265 /// \param[in] context The call context. 266 /// \param[in] action the action to execute, with type and body 267 /// \param[out] result the result iterator 268 /// \return Status 269 virtual Status DoAction(const ServerCallContext& context, const Action& action, 270 std::unique_ptr<ResultStream>* result); 271 272 /// \brief Retrieve the list of available actions 273 /// \param[in] context The call context. 274 /// \param[out] actions a vector of available action types 275 /// \return Status 276 virtual Status ListActions(const ServerCallContext& context, 277 std::vector<ActionType>* actions); 278 279 private: 280 struct Impl; 281 std::unique_ptr<Impl> impl_; 282 }; 283 284 } // namespace flight 285 } // namespace arrow 286