1 /*
2   Copyright (c) 2014-2017 DataStax
3 
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7 
8   http://www.apache.org/licenses/LICENSE-2.0
9 
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15 */
16 
17 #ifndef DATASTAX_INTERNAL_SOCKET_HPP
18 #define DATASTAX_INTERNAL_SOCKET_HPP
19 
20 #include "allocated.hpp"
21 #include "buffer.hpp"
22 #include "constants.hpp"
23 #include "list.hpp"
24 #include "scoped_ptr.hpp"
25 #include "ssl.hpp"
26 #include "stack.hpp"
27 #include "tcp_connector.hpp"
28 #include "timer.hpp"
29 
30 #include <uv.h>
31 
32 #define MIN_BUFFERS_SIZE 128
33 
34 namespace datastax { namespace internal { namespace core {
35 
36 class Socket;
37 class SocketWriteBase;
38 
39 /**
40  * A generic socket request that handles encoding data to be written to a
41  * socket.
42  */
43 class SocketRequest
44     : public Allocated
45     , public List<SocketRequest>::Node {
46 public:
~SocketRequest()47   virtual ~SocketRequest() {}
48 
49   enum {
50     SOCKET_REQUEST_ERROR_CLOSED = CASS_INT32_MIN,
51     SOCKET_REQUEST_ERROR_NO_HANDLER,
52     SOCKET_REQUEST_ERROR_LAST_ENTRY
53   };
54 
55   /**
56    * Encode a request into buffers.
57    *
58    * @param bufs
59    * @return The number of bytes written, or negative for an error
60    */
61   virtual int32_t encode(BufferVec* bufs) = 0;
62 
63   /**
64    * Handle a socket closing during a request.
65    */
66   virtual void on_close() = 0;
67 };
68 
69 /**
70  * A basic socket request that appends a buffer to the encode buffers.
71  */
72 class BufferSocketRequest : public SocketRequest {
73 public:
74   /**
75    * Constructor
76    *
77    * @param buf A buffer to append to buffers
78    */
BufferSocketRequest(const Buffer & buf)79   BufferSocketRequest(const Buffer& buf)
80       : buf_(buf) {}
81 
encode(BufferVec * bufs)82   virtual int32_t encode(BufferVec* bufs) {
83     bufs->push_back(buf_);
84     return buf_.size();
85   }
86 
on_close()87   virtual void on_close() {}
88 
89 private:
90   Buffer buf_;
91 };
92 
93 /**
94  * A generic handler for handling the basic actions of a socket. This allows
95  * sockets to handle different ways of processing the socket's
96  * incoming/outgoing data streams e.g. encryption/decryption,
97  * compression/decompression, etc.
98  */
99 class SocketHandlerBase : public Allocated {
100 public:
~SocketHandlerBase()101   virtual ~SocketHandlerBase() {}
102 
103   /**
104    * Allocate a write request.
105    *
106    * @param socket The socket requesting the new write request.
107    * @return A newly allocated write request.
108    */
109   virtual SocketWriteBase* new_pending_write(Socket* socket) = 0;
110   /**
111    * Allocate a buffer for reading data from the socket.
112    *
113    * @param suggested_size The suggested size for the read buffer.
114    * @param buf The allocated buffer.
115    */
116   virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf) = 0;
117 
118   /**
119    * A callback for handling a socket read.
120    *
121    * @param socket The socket receiving the read.
122    * @param nread The size of the read or an error if negative.
123    * @param buf The buffer read from the socket.
124    */
125   virtual void on_read(Socket* socket, ssize_t nread, const uv_buf_t* buf) = 0;
126 
127   /**
128    * A callback for handling a socket write.
129    *
130    * @param socket The socket that processed the write.
131    * @param status The status of the write. 0 if successful, otherwise and error.
132    * @param request The request doing the write.
133    */
134   virtual void on_write(Socket* socket, int status, SocketRequest* request) = 0;
135 
136   /**
137    * A callback for handling the socket close.
138    */
139   virtual void on_close() = 0;
140 };
141 
142 /**
143  * A basic socket handler that caches buffers used for reading socket data.
144  */
145 class SocketHandler : public SocketHandlerBase {
146 public:
147   ~SocketHandler();
148 
149   virtual SocketWriteBase* new_pending_write(Socket* socket);
150   virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf);
151 
152   /**
153    * Free or cache a read buffer.
154    * @param buf The buffer to free or cache. The buffer was created in
155    * alloc_buffer().
156    */
157   void free_buffer(const uv_buf_t* buf);
158 
159 private:
160   Stack<uv_buf_t> buffer_reuse_list_;
161 };
162 
163 /**
164  * A socket handler that encrypts/decrypts socket data using SSL.
165  */
166 class SslSocketHandler : public SocketHandlerBase {
167 public:
168   /**
169    * Constructor
170    *
171    * @param ssl_session A SSL session used to encrypt/decrypt data.
172    */
SslSocketHandler(SslSession * ssl_session)173   SslSocketHandler(SslSession* ssl_session)
174       : ssl_session_(ssl_session) {}
175 
176   virtual SocketWriteBase* new_pending_write(Socket* socket);
177   virtual void alloc_buffer(size_t suggested_size, uv_buf_t* buf);
178 
179   virtual void on_read(Socket* socket, ssize_t nread, const uv_buf_t* buf);
180 
181   /**
182    * A callback for handling decrypted socket data.
183    *
184    * @param socket The socket receiving the read.
185    * @param buf The decrypted data.
186    * @param size The number of bytes decrypted.
187    */
188   virtual void on_ssl_read(Socket* socket, char* buf, size_t size) = 0;
189 
190 private:
191   ScopedPtr<SslSession> ssl_session_;
192 };
193 
194 /**
195  * A generic write handler. This is used to coalesce several write requests
196  * into a flush (a write() system call).
197  */
198 class SocketWriteBase
199     : public Allocated
200     , public List<SocketWriteBase>::Node {
201 public:
202   typedef internal::List<SocketWriteBase> List;
203 
204   /**
205    * Constructor
206    *
207    * @param The socket handling the write.
208    */
SocketWriteBase(Socket * socket)209   SocketWriteBase(Socket* socket)
210       : socket_(socket)
211       , is_flushed_(false) {
212     req_.data = this;
213     buffers_.reserve(MIN_BUFFERS_SIZE);
214   }
215 
~SocketWriteBase()216   virtual ~SocketWriteBase() {}
217 
218   uv_tcp_t* tcp();
219 
220   /**
221    * Returns the flush status of the socket write.
222    *
223    * @return true if the requests have been flushed.
224    */
is_flushed() const225   bool is_flushed() const { return is_flushed_; }
226 
227   /**
228    * Clear the write so that it can be reused for more requests.
229    */
clear()230   void clear() {
231     buffers_.clear();
232     requests_.clear();
233     is_flushed_ = false;
234   }
235 
236   /**
237    * Handle a socket closing by calling request's on_close() callbacks.
238    */
239   void on_close();
240 
241   /**
242    * Add a request to this write request.
243    *
244    * @param The request to add.
245    * @return The number of bytes written. It's negative if an error occurred.
246    */
247   int32_t write(SocketRequest* request);
248 
249   /**
250    * Flush the outstanding requests to the socket.
251    */
252   virtual size_t flush() = 0;
253 
254 protected:
255   static void on_write(uv_write_t* req, int status);
256   void handle_write(uv_write_t* req, int status);
257 
258   typedef Vector<SocketRequest*> RequestVec;
259 
260   Socket* socket_;
261   uv_write_t req_;
262   bool is_flushed_;
263   BufferVec buffers_;
264   RequestVec requests_;
265 };
266 
267 /**
268  * A socket. It cannot be connected directly, use a SocketConnector to connect
269  * a socket.
270  *
271  * @see SocketConnector
272  */
273 class Socket : public RefCounted<Socket> {
274   friend class SocketConnector;
275   friend class SocketWriteBase;
276 
277 public:
278   typedef SharedRefPtr<Socket> Ptr;
279 
280   /**
281    * Constructor: Don't use this directly.
282    *
283    * @param address The address for the socket.
284    * @param max_reusable_write_objects The max limit on the number of write buffer
285    * objects to keep around.
286    */
287   Socket(const Address& address, size_t max_reusable_write_objects);
288 
289   /**
290    * Destructor.
291    */
292   ~Socket();
293 
294   /**
295    * Set the handler that will process the actions for this socket.
296    *
297    * @param handler The socket handler.
298    */
299   void set_handler(SocketHandlerBase* handler);
300 
301   /**
302    * Write a request to the socket and coalesce with outstanding requests. This
303    * method doesn't flush.
304    *
305    * @param request The request to write to the socket.
306    * @return The number of bytes written by the request to the socket.
307    */
308   int32_t write(SocketRequest* request);
309 
310   /**
311    * Write a request to the socket and flush immediately.
312    *
313    * @param request The request to write to the socket.
314    * @return The number of bytes written by the request to the socket.
315    */
316   int32_t write_and_flush(SocketRequest* request);
317 
318   /**
319    * Flush all outstanding requests.
320    */
321   size_t flush();
322 
323   /**
324    * Determine if the socket is closing.
325    *
326    * @return Returns true if closing.
327    */
328 
329   bool is_closing() const;
330 
331   /**
332    * Close the socket and notify all outstanding requests.
333    */
334   void close();
335 
336   /**
337    * Determine if the socket is defunct.
338    *
339    * @return Returns true if defunct.
340    */
is_defunct() const341   bool is_defunct() const { return is_defunct_; }
342 
343   /**
344    * Mark as defunct and close the socket.
345    */
346   void defunct();
347 
348 public:
handle()349   uv_tcp_t* handle() { return &tcp_; }
loop()350   uv_loop_t* loop() { return tcp_.loop; }
351 
address() const352   const Address& address() const { return address_; }
353 
354 private:
355   static void alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
356 
357   static void on_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf);
358   void handle_read(ssize_t nread, const uv_buf_t* buf);
359 
360   static void on_close(uv_handle_t* handle);
361   void handle_close();
362 
363   void cleanup_free_writes();
364 
365 private:
366   typedef Vector<SocketWriteBase*> SocketWriteVec;
367 
368   uv_tcp_t tcp_;
369   ScopedPtr<SocketHandlerBase> handler_;
370 
371   SocketWriteBase::List pending_writes_;
372   SocketWriteVec free_writes_;
373 
374   bool is_defunct_;
375   size_t max_reusable_write_objects_;
376 
377   Address address_;
378 };
379 
380 }}} // namespace datastax::internal::core
381 
382 #endif
383