1 // Copyright 2021 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H 15 #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H 16 17 #include <grpc/support/port_platform.h> 18 19 #include <functional> 20 #include <vector> 21 22 #include "absl/status/status.h" 23 #include "absl/status/statusor.h" 24 #include "absl/time/time.h" 25 26 #include <grpc/event_engine/endpoint_config.h> 27 #include <grpc/event_engine/memory_allocator.h> 28 #include <grpc/event_engine/port.h> 29 30 // TODO(hork): Define the Endpoint::Write metrics collection system 31 namespace grpc_event_engine { 32 namespace experimental { 33 34 //////////////////////////////////////////////////////////////////////////////// 35 /// The EventEngine encapsulates all platform-specific behaviors related to low 36 /// level network I/O, timers, asynchronous execution, and DNS resolution. 37 /// 38 /// This interface allows developers to provide their own event management and 39 /// network stacks. Motivating uses cases for supporting custom EventEngines 40 /// include the ability to hook into external event loops, and using different 41 /// EventEngine instances for each channel to better insulate network I/O and 42 /// callback processing from other channels. 43 /// 44 /// A default cross-platform EventEngine instance is provided by gRPC. 45 /// 46 /// LIFESPAN AND OWNERSHIP 47 /// 48 /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure 49 /// that the engines remain available until they are no longer needed. Depending 50 /// on the use case, engines may live until gRPC is shut down. 51 /// 52 /// EXAMPLE USAGE (Not yet implemented) 53 /// 54 /// Custom EventEngines can be specified per channel, and allow configuration 55 /// for both clients and servers. To set a custom EventEngine for a client 56 /// channel, you can do something like the following: 57 /// 58 /// ChannelArguments args; 59 /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); 60 /// args.SetEventEngine(engine); 61 /// MyAppClient client(grpc::CreateCustomChannel( 62 /// "localhost:50051", grpc::InsecureChannelCredentials(), args)); 63 /// 64 /// A gRPC server can use a custom EventEngine by calling the 65 /// ServerBuilder::SetEventEngine method: 66 /// 67 /// ServerBuilder builder; 68 /// std::shared_ptr<EventEngine> engine = std::make_shared<MyEngine>(...); 69 /// builder.SetEventEngine(engine); 70 /// std::unique_ptr<Server> server(builder.BuildAndStart()); 71 /// server->Wait(); 72 /// 73 //////////////////////////////////////////////////////////////////////////////// 74 class EventEngine { 75 public: 76 /// A custom closure type for EventEngine task execution. 77 /// 78 /// Throughout the EventEngine API, \a Closure ownership is retained by the 79 /// caller - the EventEngine will never delete a Closure, and upon 80 /// cancellation, the EventEngine will simply forget the Closure exists. The 81 /// caller is responsible for all necessary cleanup. 82 class Closure { 83 public: 84 Closure() = default; 85 // Closure's are an interface, and thus non-copyable. 86 Closure(const Closure&) = delete; 87 Closure& operator=(const Closure&) = delete; 88 // Polymorphic type => virtual destructor 89 virtual ~Closure() = default; 90 // Run the contained code. 91 virtual void Run() = 0; 92 }; 93 /// Represents a scheduled task. 94 /// 95 /// \a TaskHandles are returned by \a Run* methods, and can be given to the 96 /// \a Cancel method. 97 struct TaskHandle { 98 intptr_t keys[2]; 99 }; 100 /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct 101 /// exists on all platforms that gRPC supports. 102 /// 103 /// Platforms are expected to provide definitions for: 104 /// * sockaddr 105 /// * sockaddr_in 106 /// * sockaddr_in6 107 class ResolvedAddress { 108 public: 109 static constexpr socklen_t MAX_SIZE_BYTES = 128; 110 111 ResolvedAddress(const sockaddr* address, socklen_t size); 112 ResolvedAddress() = default; 113 ResolvedAddress(const ResolvedAddress&) = default; 114 const struct sockaddr* address() const; 115 socklen_t size() const; 116 117 private: 118 char address_[MAX_SIZE_BYTES]; 119 socklen_t size_ = 0; 120 }; 121 122 /// One end of a connection between a gRPC client and server. Endpoints are 123 /// created when connections are established, and Endpoint operations are 124 /// gRPC's primary means of communication. 125 /// 126 /// Endpoints must use the provided MemoryAllocator for all data buffer memory 127 /// allocations. gRPC allows applications to set memory constraints per 128 /// Channel or Server, and the implementation depends on all dynamic memory 129 /// allocation being handled by the quota system. 130 class Endpoint { 131 public: 132 /// Shuts down all connections and invokes all pending read or write 133 /// callbacks with an error status. 134 virtual ~Endpoint() = default; 135 /// Reads data from the Endpoint. 136 /// 137 /// When data is available on the connection, that data is moved into the 138 /// \a buffer, and the \a on_read callback is called. The caller must ensure 139 /// that the callback has access to the buffer when executed later. 140 /// Ownership of the buffer is not transferred. Valid slices *may* be placed 141 /// into the buffer even if the callback is invoked with a non-OK Status. 142 /// 143 /// There can be at most one outstanding read per Endpoint at any given 144 /// time. An outstanding read is one in which the \a on_read callback has 145 /// not yet been executed for some previous call to \a Read. If an attempt 146 /// is made to call \a Read while a previous read is still outstanding, the 147 /// \a EventEngine must abort. 148 /// 149 /// For failed read operations, implementations should pass the appropriate 150 /// statuses to \a on_read. For example, callbacks might expect to receive 151 /// CANCELLED on endpoint shutdown. 152 virtual void Read(std::function<void(absl::Status)> on_read, 153 SliceBuffer* buffer) = 0; 154 /// Writes data out on the connection. 155 /// 156 /// \a on_writable is called when the connection is ready for more data. The 157 /// Slices within the \a data buffer may be mutated at will by the Endpoint 158 /// until \a on_writable is called. The \a data SliceBuffer will remain 159 /// valid after calling \a Write, but its state is otherwise undefined. All 160 /// bytes in \a data must have been written before calling \a on_writable 161 /// unless an error has occurred. 162 /// 163 /// There can be at most one outstanding write per Endpoint at any given 164 /// time. An outstanding write is one in which the \a on_writable callback 165 /// has not yet been executed for some previous call to \a Write. If an 166 /// attempt is made to call \a Write while a previous write is still 167 /// outstanding, the \a EventEngine must abort. 168 /// 169 /// For failed write operations, implementations should pass the appropriate 170 /// statuses to \a on_writable. For example, callbacks might expect to 171 /// receive CANCELLED on endpoint shutdown. 172 virtual void Write(std::function<void(absl::Status)> on_writable, 173 SliceBuffer* data) = 0; 174 /// Returns an address in the format described in DNSResolver. The returned 175 /// values are expected to remain valid for the life of the Endpoint. 176 virtual const ResolvedAddress& GetPeerAddress() const = 0; 177 virtual const ResolvedAddress& GetLocalAddress() const = 0; 178 }; 179 180 /// Called when a new connection is established. 181 /// 182 /// If the connection attempt was not successful, implementations should pass 183 /// the appropriate statuses to this callback. For example, callbacks might 184 /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or 185 /// CANCELLED statuses on EventEngine shutdown. 186 using OnConnectCallback = 187 std::function<void(absl::StatusOr<std::unique_ptr<Endpoint>>)>; 188 189 /// Listens for incoming connection requests from gRPC clients and initiates 190 /// request processing once connections are established. 191 class Listener { 192 public: 193 /// Called when the listener has accepted a new client connection. 194 using AcceptCallback = std::function<void( 195 std::unique_ptr<Endpoint>, MemoryAllocator memory_allocator)>; 196 virtual ~Listener() = default; 197 /// Bind an address/port to this Listener. 198 /// 199 /// It is expected that multiple addresses/ports can be bound to this 200 /// Listener before Listener::Start has been called. Returns either the 201 /// bound port or an appropriate error status. 202 virtual absl::StatusOr<int> Bind(const ResolvedAddress& addr) = 0; 203 virtual absl::Status Start() = 0; 204 }; 205 206 /// Factory method to create a network listener / server. 207 /// 208 /// Once a \a Listener is created and started, the \a on_accept callback will 209 /// be called once asynchronously for each established connection. This method 210 /// may return a non-OK status immediately if an error was encountered in any 211 /// synchronous steps required to create the Listener. In this case, 212 /// \a on_shutdown will never be called. 213 /// 214 /// If this method returns a Listener, then \a on_shutdown will be invoked 215 /// exactly once, when the Listener is shut down. The status passed to it will 216 /// indicate if there was a problem during shutdown. 217 /// 218 /// The provided \a MemoryAllocatorFactory is used to create \a 219 /// MemoryAllocators for Endpoint construction. 220 virtual absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 221 Listener::AcceptCallback on_accept, 222 std::function<void(absl::Status)> on_shutdown, 223 const EndpointConfig& config, 224 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) = 0; 225 /// Creates a client network connection to a remote network listener. 226 /// 227 /// May return an error status immediately if there was a failure in the 228 /// synchronous part of establishing a connection. In that event, the \a 229 /// on_connect callback *will not* have been executed. Otherwise, it is 230 /// expected that the \a on_connect callback will be asynchronously executed 231 /// exactly once by the EventEngine. 232 /// 233 /// Implementation Note: it is important that the \a memory_allocator be used 234 /// for all read/write buffer allocations in the EventEngine implementation. 235 /// This allows gRPC's \a ResourceQuota system to monitor and control memory 236 /// usage with graceful degradation mechanisms. Please see the \a 237 /// MemoryAllocator API for more information. 238 virtual absl::Status Connect(OnConnectCallback on_connect, 239 const ResolvedAddress& addr, 240 const EndpointConfig& args, 241 MemoryAllocator memory_allocator, 242 absl::Time deadline) = 0; 243 244 /// Provides asynchronous resolution. 245 class DNSResolver { 246 public: 247 /// Task handle for DNS Resolution requests. 248 struct LookupTaskHandle { 249 intptr_t key[2]; 250 }; 251 /// DNS SRV record type. 252 struct SRVRecord { 253 std::string host; 254 int port = 0; 255 int priority = 0; 256 int weight = 0; 257 }; 258 /// Called with the collection of sockaddrs that were resolved from a given 259 /// target address. 260 using LookupHostnameCallback = 261 std::function<void(absl::StatusOr<std::vector<ResolvedAddress>>)>; 262 /// Called with a collection of SRV records. 263 using LookupSRVCallback = 264 std::function<void(absl::StatusOr<std::vector<SRVRecord>>)>; 265 /// Called with the result of a TXT record lookup 266 using LookupTXTCallback = std::function<void(absl::StatusOr<std::string>)>; 267 268 virtual ~DNSResolver() = default; 269 270 /// Asynchronously resolve an address. 271 /// 272 /// \a default_port may be a non-numeric named service port, and will only 273 /// be used if \a address does not already contain a port component. 274 /// 275 /// When the lookup is complete, the \a on_resolve callback will be invoked 276 /// with a status indicating the success or failure of the lookup. 277 /// Implementations should pass the appropriate statuses to the callback. 278 /// For example, callbacks might expect to receive DEADLINE_EXCEEDED or 279 /// NOT_FOUND. 280 /// 281 /// If cancelled, \a on_resolve will not be executed. 282 virtual LookupTaskHandle LookupHostname(LookupHostnameCallback on_resolve, 283 absl::string_view address, 284 absl::string_view default_port, 285 absl::Time deadline) = 0; 286 /// Asynchronously perform an SRV record lookup. 287 /// 288 /// \a on_resolve has the same meaning and expectations as \a 289 /// LookupHostname's \a on_resolve callback. 290 virtual LookupTaskHandle LookupSRV(LookupSRVCallback on_resolve, 291 absl::string_view name, 292 absl::Time deadline) = 0; 293 /// Asynchronously perform a TXT record lookup. 294 /// 295 /// \a on_resolve has the same meaning and expectations as \a 296 /// LookupHostname's \a on_resolve callback. 297 virtual LookupTaskHandle LookupTXT(LookupTXTCallback on_resolve, 298 absl::string_view name, 299 absl::Time deadline) = 0; 300 /// Cancel an asynchronous lookup operation. 301 /// 302 /// This shares the same semantics with \a EventEngine::Cancel: successfully 303 /// cancelled lookups will not have their callbacks executed, and this 304 /// method returns true. 305 virtual bool CancelLookup(LookupTaskHandle handle) = 0; 306 }; 307 308 /// At time of destruction, the EventEngine must have no active 309 /// responsibilities. EventEngine users (applications) are responsible for 310 /// cancelling all tasks and DNS lookups, shutting down listeners and 311 /// endpoints, prior to EventEngine destruction. If there are any outstanding 312 /// tasks, any running listeners, etc. at time of EventEngine destruction, 313 /// that is an invalid use of the API, and it will result in undefined 314 /// behavior. 315 virtual ~EventEngine() = default; 316 317 // TODO(nnoble): consider whether we can remove this method before we 318 // de-experimentalize this API. 319 virtual bool IsWorkerThread() = 0; 320 321 /// Creates and returns an instance of a DNSResolver. 322 virtual std::unique_ptr<DNSResolver> GetDNSResolver() = 0; 323 324 /// Asynchronously executes a task as soon as possible. 325 /// 326 /// \a Closures scheduled with \a Run cannot be cancelled. The \a closure will 327 /// not be deleted after it has been run, ownership remains with the caller. 328 virtual void Run(Closure* closure) = 0; 329 /// Asynchronously executes a task as soon as possible. 330 /// 331 /// \a Closures scheduled with \a Run cannot be cancelled. Unlike the 332 /// overloaded \a Closure alternative, the std::function version's \a closure 333 /// will be deleted by the EventEngine after the closure has been run. 334 /// 335 /// This version of \a Run may be less performant than the \a Closure version 336 /// in some scenarios. This overload is useful in situations where performance 337 /// is not a critical concern. 338 virtual void Run(std::function<void()> closure) = 0; 339 /// Synonymous with scheduling an alarm to run at time \a when. 340 /// 341 /// The \a closure will execute when time \a when arrives unless it has been 342 /// cancelled via the \a Cancel method. If cancelled, the closure will not be 343 /// run, nor will it be deleted. Ownership remains with the caller. 344 virtual TaskHandle RunAt(absl::Time when, Closure* closure) = 0; 345 /// Synonymous with scheduling an alarm to run at time \a when. 346 /// 347 /// The \a closure will execute when time \a when arrives unless it has been 348 /// cancelled via the \a Cancel method. If cancelled, the closure will not be 349 /// run. Unilke the overloaded \a Closure alternative, the std::function 350 /// version's \a closure will be deleted by the EventEngine after the closure 351 /// has been run, or upon cancellation. 352 /// 353 /// This version of \a RunAt may be less performant than the \a Closure 354 /// version in some scenarios. This overload is useful in situations where 355 /// performance is not a critical concern. 356 virtual TaskHandle RunAt(absl::Time when, std::function<void()> closure) = 0; 357 /// Request cancellation of a task. 358 /// 359 /// If the associated closure has already been scheduled to run, it will not 360 /// be cancelled, and this function will return false. 361 /// 362 /// If the associated callback has not been scheduled to run, it will be 363 /// cancelled, and the associated std::function or \a Closure* will not be 364 /// executed. In this case, Cancel will return true. 365 virtual bool Cancel(TaskHandle handle) = 0; 366 }; 367 368 // TODO(hork): finalize the API and document it. We need to firm up the story 369 // around user-provided EventEngines. 370 std::shared_ptr<EventEngine> DefaultEventEngineFactory(); 371 372 } // namespace experimental 373 } // namespace grpc_event_engine 374 375 #endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H 376