1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Interfaces to use for defining Flight RPC servers. API should be considered
19 // experimental for now
20 
21 #pragma once
22 
23 #include <functional>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "arrow/flight/server_auth.h"
30 #include "arrow/flight/types.h"       // IWYU pragma: keep
31 #include "arrow/flight/visibility.h"  // IWYU pragma: keep
32 #include "arrow/ipc/dictionary.h"
33 #include "arrow/ipc/options.h"
34 #include "arrow/record_batch.h"
35 
36 namespace arrow {
37 
38 class Schema;
39 class Status;
40 
41 namespace flight {
42 
43 class ServerMiddleware;
44 class ServerMiddlewareFactory;
45 
46 /// \brief Interface that produces a sequence of IPC payloads to be sent in
47 /// FlightData protobuf messages
48 class ARROW_FLIGHT_EXPORT FlightDataStream {
49  public:
50   virtual ~FlightDataStream();
51 
52   virtual std::shared_ptr<Schema> schema() = 0;
53 
54   /// \brief Compute FlightPayload containing serialized RecordBatch schema
55   virtual Status GetSchemaPayload(FlightPayload* payload) = 0;
56 
57   // When the stream is completed, the last payload written will have null
58   // metadata
59   virtual Status Next(FlightPayload* payload) = 0;
60 };
61 
62 /// \brief A basic implementation of FlightDataStream that will provide
63 /// a sequence of FlightData messages to be written to a gRPC stream
64 class ARROW_FLIGHT_EXPORT RecordBatchStream : public FlightDataStream {
65  public:
66   /// \param[in] reader produces a sequence of record batches
67   /// \param[in] options IPC options for writing
68   explicit RecordBatchStream(
69       const std::shared_ptr<RecordBatchReader>& reader,
70       const ipc::IpcWriteOptions& options = ipc::IpcWriteOptions::Defaults());
71   ~RecordBatchStream() override;
72 
73   std::shared_ptr<Schema> schema() override;
74   Status GetSchemaPayload(FlightPayload* payload) override;
75   Status Next(FlightPayload* payload) override;
76 
77  private:
78   class RecordBatchStreamImpl;
79   std::unique_ptr<RecordBatchStreamImpl> impl_;
80 };
81 
82 /// \brief A reader for IPC payloads uploaded by a client. Also allows
83 /// reading application-defined metadata via the Flight protocol.
84 class ARROW_FLIGHT_EXPORT FlightMessageReader : public MetadataRecordBatchReader {
85  public:
86   /// \brief Get the descriptor for this upload.
87   virtual const FlightDescriptor& descriptor() const = 0;
88 };
89 
90 /// \brief A writer for application-specific metadata sent back to the
91 /// client during an upload.
92 class ARROW_FLIGHT_EXPORT FlightMetadataWriter {
93  public:
94   virtual ~FlightMetadataWriter();
95   /// \brief Send a message to the client.
96   virtual Status WriteMetadata(const Buffer& app_metadata) = 0;
97 };
98 
99 /// \brief A writer for IPC payloads to a client. Also allows sending
100 /// application-defined metadata via the Flight protocol.
101 ///
102 /// This class offers more control compared to FlightDataStream,
103 /// including the option to write metadata without data and the
104 /// ability to interleave reading and writing.
105 class ARROW_FLIGHT_EXPORT FlightMessageWriter : public MetadataRecordBatchWriter {
106  public:
107   virtual ~FlightMessageWriter() = default;
108 };
109 
110 /// \brief Call state/contextual data.
111 class ARROW_FLIGHT_EXPORT ServerCallContext {
112  public:
113   virtual ~ServerCallContext() = default;
114   /// \brief The name of the authenticated peer (may be the empty string)
115   virtual const std::string& peer_identity() const = 0;
116   /// \brief The peer address (not validated)
117   virtual const std::string& peer() const = 0;
118   /// \brief Look up a middleware by key. Do not maintain a reference
119   /// to the object beyond the request body.
120   /// \return The middleware, or nullptr if not found.
121   virtual ServerMiddleware* GetMiddleware(const std::string& key) const = 0;
122   /// \brief Check if the current RPC has been cancelled (by the client, by
123   /// a network error, etc.).
124   virtual bool is_cancelled() const = 0;
125 };
126 
127 class ARROW_FLIGHT_EXPORT FlightServerOptions {
128  public:
129   explicit FlightServerOptions(const Location& location_);
130 
131   ~FlightServerOptions();
132 
133   /// \brief The host & port (or domain socket path) to listen on.
134   /// Use port 0 to bind to an available port.
135   Location location;
136   /// \brief The authentication handler to use.
137   std::shared_ptr<ServerAuthHandler> auth_handler;
138   /// \brief A list of TLS certificate+key pairs to use.
139   std::vector<CertKeyPair> tls_certificates;
140   /// \brief Enable mTLS and require that the client present a certificate.
141   bool verify_client;
142   /// \brief If using mTLS, the PEM-encoded root certificate to use.
143   std::string root_certificates;
144   /// \brief A list of server middleware to apply, along with a key to
145   /// identify them by.
146   ///
147   /// Middleware are always applied in the order provided. Duplicate
148   /// keys are an error.
149   std::vector<std::pair<std::string, std::shared_ptr<ServerMiddlewareFactory>>>
150       middleware;
151 
152   /// \brief A Flight implementation-specific callback to customize
153   /// transport-specific options.
154   ///
155   /// Not guaranteed to be called. The type of the parameter is
156   /// specific to the Flight implementation. Users should take care to
157   /// link to the same transport implementation as Flight to avoid
158   /// runtime problems.
159   std::function<void(void*)> builder_hook;
160 };
161 
162 /// \brief Skeleton RPC server implementation which can be used to create
163 /// custom servers by implementing its abstract methods
164 class ARROW_FLIGHT_EXPORT FlightServerBase {
165  public:
166   FlightServerBase();
167   virtual ~FlightServerBase();
168 
169   // Lifecycle methods.
170 
171   /// \brief Initialize a Flight server listening at the given location.
172   /// This method must be called before any other method.
173   /// \param[in] options The configuration for this server.
174   Status Init(const FlightServerOptions& options);
175 
176   /// \brief Get the port that the Flight server is listening on.
177   /// This method must only be called after Init().  Will return a
178   /// non-positive value if no port exists (e.g. when listening on a
179   /// domain socket).
180   int port() const;
181 
182   /// \brief Set the server to stop when receiving any of the given signal
183   /// numbers.
184   /// This method must be called before Serve().
185   Status SetShutdownOnSignals(const std::vector<int> sigs);
186 
187   /// \brief Start serving.
188   /// This method blocks until either Shutdown() is called or one of the signals
189   /// registered in SetShutdownOnSignals() is received.
190   Status Serve();
191 
192   /// \brief Query whether Serve() was interrupted by a signal.
193   /// This method must be called after Serve() has returned.
194   ///
195   /// \return int the signal number that interrupted Serve(), if any, otherwise 0
196   int GotSignal() const;
197 
198   /// \brief Shut down the server. Can be called from signal handler or another
199   /// thread while Serve() blocks.
200   ///
201   /// TODO(wesm): Shutdown with deadline
202   Status Shutdown();
203 
204   /// \brief Block until server is terminated with Shutdown.
205   Status Wait();
206 
207   // Implement these methods to create your own server. The default
208   // implementations will return a not-implemented result to the client
209 
210   /// \brief Retrieve a list of available fields given an optional opaque
211   /// criteria
212   /// \param[in] context The call context.
213   /// \param[in] criteria may be null
214   /// \param[out] listings the returned listings iterator
215   /// \return Status
216   virtual Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
217                              std::unique_ptr<FlightListing>* listings);
218 
219   /// \brief Retrieve the schema and an access plan for the indicated
220   /// descriptor
221   /// \param[in] context The call context.
222   /// \param[in] request may be null
223   /// \param[out] info the returned flight info provider
224   /// \return Status
225   virtual Status GetFlightInfo(const ServerCallContext& context,
226                                const FlightDescriptor& request,
227                                std::unique_ptr<FlightInfo>* info);
228 
229   /// \brief Retrieve the schema for the indicated descriptor
230   /// \param[in] context The call context.
231   /// \param[in] request may be null
232   /// \param[out] schema the returned flight schema provider
233   /// \return Status
234   virtual Status GetSchema(const ServerCallContext& context,
235                            const FlightDescriptor& request,
236                            std::unique_ptr<SchemaResult>* schema);
237 
238   /// \brief Get a stream of IPC payloads to put on the wire
239   /// \param[in] context The call context.
240   /// \param[in] request an opaque ticket
241   /// \param[out] stream the returned stream provider
242   /// \return Status
243   virtual Status DoGet(const ServerCallContext& context, const Ticket& request,
244                        std::unique_ptr<FlightDataStream>* stream);
245 
246   /// \brief Process a stream of IPC payloads sent from a client
247   /// \param[in] context The call context.
248   /// \param[in] reader a sequence of uploaded record batches
249   /// \param[in] writer send metadata back to the client
250   /// \return Status
251   virtual Status DoPut(const ServerCallContext& context,
252                        std::unique_ptr<FlightMessageReader> reader,
253                        std::unique_ptr<FlightMetadataWriter> writer);
254 
255   /// \brief Process a bidirectional stream of IPC payloads
256   /// \param[in] context The call context.
257   /// \param[in] reader a sequence of uploaded record batches
258   /// \param[in] writer send data back to the client
259   /// \return Status
260   virtual Status DoExchange(const ServerCallContext& context,
261                             std::unique_ptr<FlightMessageReader> reader,
262                             std::unique_ptr<FlightMessageWriter> writer);
263 
264   /// \brief Execute an action, return stream of zero or more results
265   /// \param[in] context The call context.
266   /// \param[in] action the action to execute, with type and body
267   /// \param[out] result the result iterator
268   /// \return Status
269   virtual Status DoAction(const ServerCallContext& context, const Action& action,
270                           std::unique_ptr<ResultStream>* result);
271 
272   /// \brief Retrieve the list of available actions
273   /// \param[in] context The call context.
274   /// \param[out] actions a vector of available action types
275   /// \return Status
276   virtual Status ListActions(const ServerCallContext& context,
277                              std::vector<ActionType>* actions);
278 
279  private:
280   struct Impl;
281   std::unique_ptr<Impl> impl_;
282 };
283 
284 }  // namespace flight
285 }  // namespace arrow
286