1 /* 2 Copyright (c) 2014-2017 DataStax 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef DATASTAX_INTERNAL_SOCKET_HPP 18 #define DATASTAX_INTERNAL_SOCKET_HPP 19 20 #include "allocated.hpp" 21 #include "buffer.hpp" 22 #include "constants.hpp" 23 #include "list.hpp" 24 #include "scoped_ptr.hpp" 25 #include "ssl.hpp" 26 #include "stack.hpp" 27 #include "tcp_connector.hpp" 28 #include "timer.hpp" 29 30 #include <uv.h> 31 32 #define MIN_BUFFERS_SIZE 128 33 34 namespace datastax { namespace internal { namespace core { 35 36 class Socket; 37 class SocketWriteBase; 38 39 /** 40 * A generic socket request that handles encoding data to be written to a 41 * socket. 42 */ 43 class SocketRequest 44 : public Allocated 45 , public List<SocketRequest>::Node { 46 public: ~SocketRequest()47 virtual ~SocketRequest() {} 48 49 enum { 50 SOCKET_REQUEST_ERROR_CLOSED = CASS_INT32_MIN, 51 SOCKET_REQUEST_ERROR_NO_HANDLER, 52 SOCKET_REQUEST_ERROR_LAST_ENTRY 53 }; 54 55 /** 56 * Encode a request into buffers. 57 * 58 * @param bufs 59 * @return The number of bytes written, or negative for an error 60 */ 61 virtual int32_t encode(BufferVec* bufs) = 0; 62 63 /** 64 * Handle a socket closing during a request. 65 */ 66 virtual void on_close() = 0; 67 }; 68 69 /** 70 * A basic socket request that appends a buffer to the encode buffers. 71 */ 72 class BufferSocketRequest : public SocketRequest { 73 public: 74 /** 75 * Constructor 76 * 77 * @param buf A buffer to append to buffers 78 */ BufferSocketRequest(const Buffer & buf)79 BufferSocketRequest(const Buffer& buf) 80 : buf_(buf) {} 81 encode(BufferVec * bufs)82 virtual int32_t encode(BufferVec* bufs) { 83 bufs->push_back(buf_); 84 return buf_.size(); 85 } 86 on_close()87 virtual void on_close() {} 88 89 private: 90 Buffer buf_; 91 }; 92 93 /** 94 * A generic handler for handling the basic actions of a socket. This allows 95 * sockets to handle different ways of processing the socket's 96 * incoming/outgoing data streams e.g. encryption/decryption, 97 * compression/decompression, etc. 98 */ 99 class SocketHandlerBase : public Allocated { 100 public: ~SocketHandlerBase()101 virtual ~SocketHandlerBase() {} 102 103 /** 104 * Allocate a write request. 105 * 106 * @param socket The socket requesting the new write request. 107 * @return A newly allocated write request. 108 */ 109 virtual SocketWriteBase* new_pending_write(Socket* socket) = 0; 110 /** 111 * Allocate a buffer for reading data from the socket. 112 * 113 * @param suggested_size The suggested size for the read buffer. 114 * @param buf The allocated buffer. 115 */ 116 virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf) = 0; 117 118 /** 119 * A callback for handling a socket read. 120 * 121 * @param socket The socket receiving the read. 122 * @param nread The size of the read or an error if negative. 123 * @param buf The buffer read from the socket. 124 */ 125 virtual void on_read(Socket* socket, ssize_t nread, const uv_buf_t* buf) = 0; 126 127 /** 128 * A callback for handling a socket write. 129 * 130 * @param socket The socket that processed the write. 131 * @param status The status of the write. 0 if successful, otherwise and error. 132 * @param request The request doing the write. 133 */ 134 virtual void on_write(Socket* socket, int status, SocketRequest* request) = 0; 135 136 /** 137 * A callback for handling the socket close. 138 */ 139 virtual void on_close() = 0; 140 }; 141 142 /** 143 * A basic socket handler that caches buffers used for reading socket data. 144 */ 145 class SocketHandler : public SocketHandlerBase { 146 public: 147 ~SocketHandler(); 148 149 virtual SocketWriteBase* new_pending_write(Socket* socket); 150 virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf); 151 152 /** 153 * Free or cache a read buffer. 154 * @param buf The buffer to free or cache. The buffer was created in 155 * alloc_buffer(). 156 */ 157 void free_buffer(const uv_buf_t* buf); 158 159 private: 160 Stack<uv_buf_t> buffer_reuse_list_; 161 }; 162 163 /** 164 * A socket handler that encrypts/decrypts socket data using SSL. 165 */ 166 class SslSocketHandler : public SocketHandlerBase { 167 public: 168 /** 169 * Constructor 170 * 171 * @param ssl_session A SSL session used to encrypt/decrypt data. 172 */ SslSocketHandler(SslSession * ssl_session)173 SslSocketHandler(SslSession* ssl_session) 174 : ssl_session_(ssl_session) {} 175 176 virtual SocketWriteBase* new_pending_write(Socket* socket); 177 virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf); 178 179 virtual void on_read(Socket* socket, ssize_t nread, const uv_buf_t* buf); 180 181 /** 182 * A callback for handling decrypted socket data. 183 * 184 * @param socket The socket receiving the read. 185 * @param buf The decrypted data. 186 * @param size The number of bytes decrypted. 187 */ 188 virtual void on_ssl_read(Socket* socket, char* buf, size_t size) = 0; 189 190 private: 191 ScopedPtr<SslSession> ssl_session_; 192 }; 193 194 /** 195 * A generic write handler. This is used to coalesce several write requests 196 * into a flush (a write() system call). 197 */ 198 class SocketWriteBase 199 : public Allocated 200 , public List<SocketWriteBase>::Node { 201 public: 202 typedef internal::List<SocketWriteBase> List; 203 204 /** 205 * Constructor 206 * 207 * @param The socket handling the write. 208 */ SocketWriteBase(Socket * socket)209 SocketWriteBase(Socket* socket) 210 : socket_(socket) 211 , is_flushed_(false) { 212 req_.data = this; 213 buffers_.reserve(MIN_BUFFERS_SIZE); 214 } 215 ~SocketWriteBase()216 virtual ~SocketWriteBase() {} 217 218 uv_tcp_t* tcp(); 219 220 /** 221 * Returns the flush status of the socket write. 222 * 223 * @return true if the requests have been flushed. 224 */ is_flushed() const225 bool is_flushed() const { return is_flushed_; } 226 227 /** 228 * Clear the write so that it can be reused for more requests. 229 */ clear()230 void clear() { 231 buffers_.clear(); 232 requests_.clear(); 233 is_flushed_ = false; 234 } 235 236 /** 237 * Handle a socket closing by calling request's on_close() callbacks. 238 */ 239 void on_close(); 240 241 /** 242 * Add a request to this write request. 243 * 244 * @param The request to add. 245 * @return The number of bytes written. It's negative if an error occurred. 246 */ 247 int32_t write(SocketRequest* request); 248 249 /** 250 * Flush the outstanding requests to the socket. 251 */ 252 virtual size_t flush() = 0; 253 254 protected: 255 static void on_write(uv_write_t* req, int status); 256 void handle_write(uv_write_t* req, int status); 257 258 typedef Vector<SocketRequest*> RequestVec; 259 260 Socket* socket_; 261 uv_write_t req_; 262 bool is_flushed_; 263 BufferVec buffers_; 264 RequestVec requests_; 265 }; 266 267 /** 268 * A socket. It cannot be connected directly, use a SocketConnector to connect 269 * a socket. 270 * 271 * @see SocketConnector 272 */ 273 class Socket : public RefCounted<Socket> { 274 friend class SocketConnector; 275 friend class SocketWriteBase; 276 277 public: 278 typedef SharedRefPtr<Socket> Ptr; 279 280 /** 281 * Constructor: Don't use this directly. 282 * 283 * @param address The address for the socket. 284 * @param max_reusable_write_objects The max limit on the number of write buffer 285 * objects to keep around. 286 */ 287 Socket(const Address& address, size_t max_reusable_write_objects); 288 289 /** 290 * Destructor. 291 */ 292 ~Socket(); 293 294 /** 295 * Set the handler that will process the actions for this socket. 296 * 297 * @param handler The socket handler. 298 */ 299 void set_handler(SocketHandlerBase* handler); 300 301 /** 302 * Write a request to the socket and coalesce with outstanding requests. This 303 * method doesn't flush. 304 * 305 * @param request The request to write to the socket. 306 * @return The number of bytes written by the request to the socket. 307 */ 308 int32_t write(SocketRequest* request); 309 310 /** 311 * Write a request to the socket and flush immediately. 312 * 313 * @param request The request to write to the socket. 314 * @return The number of bytes written by the request to the socket. 315 */ 316 int32_t write_and_flush(SocketRequest* request); 317 318 /** 319 * Flush all outstanding requests. 320 */ 321 size_t flush(); 322 323 /** 324 * Determine if the socket is closing. 325 * 326 * @return Returns true if closing. 327 */ 328 329 bool is_closing() const; 330 331 /** 332 * Close the socket and notify all outstanding requests. 333 */ 334 void close(); 335 336 /** 337 * Determine if the socket is defunct. 338 * 339 * @return Returns true if defunct. 340 */ is_defunct() const341 bool is_defunct() const { return is_defunct_; } 342 343 /** 344 * Mark as defunct and close the socket. 345 */ 346 void defunct(); 347 348 public: handle()349 uv_tcp_t* handle() { return &tcp_; } loop()350 uv_loop_t* loop() { return tcp_.loop; } 351 address() const352 const Address& address() const { return address_; } 353 354 private: 355 static void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); 356 357 static void on_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf); 358 void handle_read(ssize_t nread, const uv_buf_t* buf); 359 360 static void on_close(uv_handle_t* handle); 361 void handle_close(); 362 363 void cleanup_free_writes(); 364 365 private: 366 typedef Vector<SocketWriteBase*> SocketWriteVec; 367 368 uv_tcp_t tcp_; 369 ScopedPtr<SocketHandlerBase> handler_; 370 371 SocketWriteBase::List pending_writes_; 372 SocketWriteVec free_writes_; 373 374 bool is_defunct_; 375 size_t max_reusable_write_objects_; 376 377 Address address_; 378 }; 379 380 }}} // namespace datastax::internal::core 381 382 #endif 383