1 // Copyright 2021 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H
15 #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <functional>
20 #include <vector>
21 
22 #include "absl/status/status.h"
23 #include "absl/status/statusor.h"
24 #include "absl/time/time.h"
25 
26 #include <grpc/event_engine/endpoint_config.h>
27 #include <grpc/event_engine/memory_allocator.h>
28 #include <grpc/event_engine/port.h>
29 
30 // TODO(hork): Define the Endpoint::Write metrics collection system
31 namespace grpc_event_engine {
32 namespace experimental {
33 
34 ////////////////////////////////////////////////////////////////////////////////
35 /// The EventEngine encapsulates all platform-specific behaviors related to low
36 /// level network I/O, timers, asynchronous execution, and DNS resolution.
37 ///
38 /// This interface allows developers to provide their own event management and
39 /// network stacks. Motivating uses cases for supporting custom EventEngines
40 /// include the ability to hook into external event loops, and using different
41 /// EventEngine instances for each channel to better insulate network I/O and
42 /// callback processing from other channels.
43 ///
44 /// A default cross-platform EventEngine instance is provided by gRPC.
45 ///
46 /// LIFESPAN AND OWNERSHIP
47 ///
48 /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure
49 /// that the engines remain available until they are no longer needed. Depending
50 /// on the use case, engines may live until gRPC is shut down.
51 ///
52 /// EXAMPLE USAGE (Not yet implemented)
53 ///
54 /// Custom EventEngines can be specified per channel, and allow configuration
55 /// for both clients and servers. To set a custom EventEngine for a client
56 /// channel, you can do something like the following:
57 ///
58 ///    ChannelArguments args;
59 ///    std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
60 ///    args.SetEventEngine(engine);
61 ///    MyAppClient client(grpc::CreateCustomChannel(
62 ///        "localhost:50051", grpc::InsecureChannelCredentials(), args));
63 ///
64 /// A gRPC server can use a custom EventEngine by calling the
65 /// ServerBuilder::SetEventEngine method:
66 ///
67 ///    ServerBuilder builder;
68 ///    std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...);
69 ///    builder.SetEventEngine(engine);
70 ///    std::unique_ptr<Server> server(builder.BuildAndStart());
71 ///    server->Wait();
72 ///
73 ////////////////////////////////////////////////////////////////////////////////
74 class EventEngine {
75  public:
76   /// A custom closure type for EventEngine task execution.
77   ///
78   /// Throughout the EventEngine API, \a Closure ownership is retained by the
79   /// caller - the EventEngine will never delete a Closure, and upon
80   /// cancellation, the EventEngine will simply forget the Closure exists. The
81   /// caller is responsible for all necessary cleanup.
82   class Closure {
83    public:
84     Closure() = default;
85     // Closure's are an interface, and thus non-copyable.
86     Closure(const Closure&) = delete;
87     Closure& operator=(const Closure&) = delete;
88     // Polymorphic type => virtual destructor
89     virtual ~Closure() = default;
90     // Run the contained code.
91     virtual void Run() = 0;
92   };
93   /// Represents a scheduled task.
94   ///
95   /// \a TaskHandles are returned by \a Run* methods, and can be given to the
96   /// \a Cancel method.
97   struct TaskHandle {
98     intptr_t keys[2];
99   };
100   /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct
101   /// exists on all platforms that gRPC supports.
102   ///
103   /// Platforms are expected to provide definitions for:
104   /// * sockaddr
105   /// * sockaddr_in
106   /// * sockaddr_in6
107   class ResolvedAddress {
108    public:
109     static constexpr socklen_t MAX_SIZE_BYTES = 128;
110 
111     ResolvedAddress(const sockaddr* address, socklen_t size);
112     ResolvedAddress() = default;
113     ResolvedAddress(const ResolvedAddress&) = default;
114     const struct sockaddr* address() const;
115     socklen_t size() const;
116 
117    private:
118     char address_[MAX_SIZE_BYTES];
119     socklen_t size_ = 0;
120   };
121 
122   /// One end of a connection between a gRPC client and server. Endpoints are
123   /// created when connections are established, and Endpoint operations are
124   /// gRPC's primary means of communication.
125   ///
126   /// Endpoints must use the provided MemoryAllocator for all data buffer memory
127   /// allocations. gRPC allows applications to set memory constraints per
128   /// Channel or Server, and the implementation depends on all dynamic memory
129   /// allocation being handled by the quota system.
130   class Endpoint {
131    public:
132     /// Shuts down all connections and invokes all pending read or write
133     /// callbacks with an error status.
134     virtual ~Endpoint() = default;
135     /// Reads data from the Endpoint.
136     ///
137     /// When data is available on the connection, that data is moved into the
138     /// \a buffer, and the \a on_read callback is called. The caller must ensure
139     /// that the callback has access to the buffer when executed later.
140     /// Ownership of the buffer is not transferred. Valid slices *may* be placed
141     /// into the buffer even if the callback is invoked with a non-OK Status.
142     ///
143     /// There can be at most one outstanding read per Endpoint at any given
144     /// time. An outstanding read is one in which the \a on_read callback has
145     /// not yet been executed for some previous call to \a Read.  If an attempt
146     /// is made to call \a Read while a previous read is still outstanding, the
147     /// \a EventEngine must abort.
148     ///
149     /// For failed read operations, implementations should pass the appropriate
150     /// statuses to \a on_read. For example, callbacks might expect to receive
151     /// CANCELLED on endpoint shutdown.
152     virtual void Read(std::function<void(absl::Status)> on_read,
153                       SliceBuffer* buffer) = 0;
154     /// Writes data out on the connection.
155     ///
156     /// \a on_writable is called when the connection is ready for more data. The
157     /// Slices within the \a data buffer may be mutated at will by the Endpoint
158     /// until \a on_writable is called. The \a data SliceBuffer will remain
159     /// valid after calling \a Write, but its state is otherwise undefined.  All
160     /// bytes in \a data must have been written before calling \a on_writable
161     /// unless an error has occurred.
162     ///
163     /// There can be at most one outstanding write per Endpoint at any given
164     /// time. An outstanding write is one in which the \a on_writable callback
165     /// has not yet been executed for some previous call to \a Write.  If an
166     /// attempt is made to call \a Write while a previous write is still
167     /// outstanding, the \a EventEngine must abort.
168     ///
169     /// For failed write operations, implementations should pass the appropriate
170     /// statuses to \a on_writable. For example, callbacks might expect to
171     /// receive CANCELLED on endpoint shutdown.
172     virtual void Write(std::function<void(absl::Status)> on_writable,
173                        SliceBuffer* data) = 0;
174     /// Returns an address in the format described in DNSResolver. The returned
175     /// values are expected to remain valid for the life of the Endpoint.
176     virtual const ResolvedAddress& GetPeerAddress() const = 0;
177     virtual const ResolvedAddress& GetLocalAddress() const = 0;
178   };
179 
180   /// Called when a new connection is established.
181   ///
182   /// If the connection attempt was not successful, implementations should pass
183   /// the appropriate statuses to this callback. For example, callbacks might
184   /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or
185   /// CANCELLED statuses on EventEngine shutdown.
186   using OnConnectCallback =
187       std::function<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>;
188 
189   /// Listens for incoming connection requests from gRPC clients and initiates
190   /// request processing once connections are established.
191   class Listener {
192    public:
193     /// Called when the listener has accepted a new client connection.
194     using AcceptCallback = std::function<void(
195         std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>;
196     virtual ~Listener() = default;
197     /// Bind an address/port to this Listener.
198     ///
199     /// It is expected that multiple addresses/ports can be bound to this
200     /// Listener before Listener::Start has been called. Returns either the
201     /// bound port or an appropriate error status.
202     virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0;
203     virtual absl::Status Start() = 0;
204   };
205 
206   /// Factory method to create a network listener / server.
207   ///
208   /// Once a \a Listener is created and started, the \a on_accept callback will
209   /// be called once asynchronously for each established connection. This method
210   /// may return a non-OK status immediately if an error was encountered in any
211   /// synchronous steps required to create the Listener. In this case,
212   /// \a on_shutdown will never be called.
213   ///
214   /// If this method returns a Listener, then \a on_shutdown will be invoked
215   /// exactly once, when the Listener is shut down. The status passed to it will
216   /// indicate if there was a problem during shutdown.
217   ///
218   /// The provided \a MemoryAllocatorFactory is used to create \a
219   /// MemoryAllocators for Endpoint construction.
220   virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
221       Listener::AcceptCallback on_accept,
222       std::function<void(absl::Status)> on_shutdown,
223       const EndpointConfig& config,
224       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0;
225   /// Creates a client network connection to a remote network listener.
226   ///
227   /// May return an error status immediately if there was a failure in the
228   /// synchronous part of establishing a connection. In that event, the \a
229   /// on_connect callback *will not* have been executed. Otherwise, it is
230   /// expected that the \a on_connect callback will be asynchronously executed
231   /// exactly once by the EventEngine.
232   ///
233   /// Implementation Note: it is important that the \a memory_allocator be used
234   /// for all read/write buffer allocations in the EventEngine implementation.
235   /// This allows gRPC's \a ResourceQuota system to monitor and control memory
236   /// usage with graceful degradation mechanisms. Please see the \a
237   /// MemoryAllocator API for more information.
238   virtual absl::Status Connect(OnConnectCallback on_connect,
239                                const ResolvedAddress& addr,
240                                const EndpointConfig& args,
241                                MemoryAllocator memory_allocator,
242                                absl::Time deadline) = 0;
243 
244   /// Provides asynchronous resolution.
245   class DNSResolver {
246    public:
247     /// Task handle for DNS Resolution requests.
248     struct LookupTaskHandle {
249       intptr_t key[2];
250     };
251     /// DNS SRV record type.
252     struct SRVRecord {
253       std::string host;
254       int port = 0;
255       int priority = 0;
256       int weight = 0;
257     };
258     /// Called with the collection of sockaddrs that were resolved from a given
259     /// target address.
260     using LookupHostnameCallback =
261         std::function<void(absl::StatusOr<std::vector<ResolvedAddress>>)>;
262     /// Called with a collection of SRV records.
263     using LookupSRVCallback =
264         std::function<void(absl::StatusOr<std::vector<SRVRecord>>)>;
265     /// Called with the result of a TXT record lookup
266     using LookupTXTCallback = std::function<void(absl::StatusOr<std::string>)>;
267 
268     virtual ~DNSResolver() = default;
269 
270     /// Asynchronously resolve an address.
271     ///
272     /// \a default_port may be a non-numeric named service port, and will only
273     /// be used if \a address does not already contain a port component.
274     ///
275     /// When the lookup is complete, the \a on_resolve callback will be invoked
276     /// with a status indicating the success or failure of the lookup.
277     /// Implementations should pass the appropriate statuses to the callback.
278     /// For example, callbacks might expect to receive DEADLINE_EXCEEDED or
279     /// NOT_FOUND.
280     ///
281     /// If cancelled, \a on_resolve will not be executed.
282     virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve,
283                                             absl::string_view address,
284                                             absl::string_view default_port,
285                                             absl::Time deadline) = 0;
286     /// Asynchronously perform an SRV record lookup.
287     ///
288     /// \a on_resolve has the same meaning and expectations as \a
289     /// LookupHostname's \a on_resolve callback.
290     virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve,
291                                        absl::string_view name,
292                                        absl::Time deadline) = 0;
293     /// Asynchronously perform a TXT record lookup.
294     ///
295     /// \a on_resolve has the same meaning and expectations as \a
296     /// LookupHostname's \a on_resolve callback.
297     virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve,
298                                        absl::string_view name,
299                                        absl::Time deadline) = 0;
300     /// Cancel an asynchronous lookup operation.
301     ///
302     /// This shares the same semantics with \a EventEngine::Cancel: successfully
303     /// cancelled lookups will not have their callbacks executed, and this
304     /// method returns true.
305     virtual bool CancelLookup(LookupTaskHandle handle) = 0;
306   };
307 
308   /// At time of destruction, the EventEngine must have no active
309   /// responsibilities. EventEngine users (applications) are responsible for
310   /// cancelling all tasks and DNS lookups, shutting down listeners and
311   /// endpoints, prior to EventEngine destruction. If there are any outstanding
312   /// tasks, any running listeners, etc. at time of EventEngine destruction,
313   /// that is an invalid use of the API, and it will result in undefined
314   /// behavior.
315   virtual ~EventEngine() = default;
316 
317   // TODO(nnoble): consider whether we can remove this method before we
318   // de-experimentalize this API.
319   virtual bool IsWorkerThread() = 0;
320 
321   /// Creates and returns an instance of a DNSResolver.
322   virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0;
323 
324   /// Asynchronously executes a task as soon as possible.
325   ///
326   /// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will
327   /// not be deleted after it has been run, ownership remains with the caller.
328   virtual void Run(Closure* closure) = 0;
329   /// Asynchronously executes a task as soon as possible.
330   ///
331   /// \a Closures scheduled with \a Run cannot be cancelled. Unlike the
332   /// overloaded \a Closure alternative, the std::function version's \a closure
333   /// will be deleted by the EventEngine after the closure has been run.
334   ///
335   /// This version of \a Run may be less performant than the \a Closure version
336   /// in some scenarios. This overload is useful in situations where performance
337   /// is not a critical concern.
338   virtual void Run(std::function<void()> closure) = 0;
339   /// Synonymous with scheduling an alarm to run at time \a when.
340   ///
341   /// The \a closure will execute when time \a when arrives unless it has been
342   /// cancelled via the \a Cancel method. If cancelled, the closure will not be
343   /// run, nor will it be deleted. Ownership remains with the caller.
344   virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0;
345   /// Synonymous with scheduling an alarm to run at time \a when.
346   ///
347   /// The \a closure will execute when time \a when arrives unless it has been
348   /// cancelled via the \a Cancel method. If cancelled, the closure will not be
349   /// run. Unilke the overloaded \a Closure alternative, the std::function
350   /// version's \a closure will be deleted by the EventEngine after the closure
351   /// has been run, or upon cancellation.
352   ///
353   /// This version of \a RunAt may be less performant than the \a Closure
354   /// version in some scenarios. This overload is useful in situations where
355   /// performance is not a critical concern.
356   virtual TaskHandle RunAt(absl::Time when, std::function<void()> closure) = 0;
357   /// Request cancellation of a task.
358   ///
359   /// If the associated closure has already been scheduled to run, it will not
360   /// be cancelled, and this function will return false.
361   ///
362   /// If the associated callback has not been scheduled to run, it will be
363   /// cancelled, and the associated std::function or \a Closure* will not be
364   /// executed. In this case, Cancel will return true.
365   virtual bool Cancel(TaskHandle handle) = 0;
366 };
367 
368 // TODO(hork): finalize the API and document it. We need to firm up the story
369 // around user-provided EventEngines.
370 std::shared_ptr<EventEngine> DefaultEventEngineFactory();
371 
372 }  // namespace experimental
373 }  // namespace grpc_event_engine
374 
375 #endif  // GRPC_EVENT_ENGINE_EVENT_ENGINE_H
376