1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 /* 3 * Copyright 2014 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef LCBIO_CTXEASY_H 19 #define LCBIO_CTXEASY_H 20 #include "connect.h" 21 #include "rdb/rope.h" 22 #include "ringbuffer.h" 23 24 #ifdef __cplusplus 25 extern "C" { 26 #endif 27 28 /** 29 * @file 30 * This file contains routines for reading and writing from and to a socket 31 */ 32 33 /** 34 * @ingroup lcbio 35 * @defgroup lcbio-ctx Reading/Writing Routines 36 * 37 * @details 38 * 39 * # Attaching 40 * 41 * A context is first _attached_ to a socket and a _user-defined_ data object. 42 * The idea is that the context is the broker which schedules I/O on behalf 43 * of the application to the socket, and then receives events from the socket, 44 * passing it along to user-defined data. 45 * 46 * To create a new context, invoke the lcbio_ctx_new() function. When you are done 47 * with it call the lcbio_ctx_close() function. 48 * 49 * # Reading 50 * 51 * Reading data is done through first requesting an amount of data to read 52 * and then reading the data from the buffers when the lcbio_CTXPROCS#cb_read 53 * handler is invoked. 54 * 55 * Requesting an amount of data to be read may be dependent on some current 56 * parsing context. In cases where the expected message size is known the pattern 57 * is to initially request the size of the header; once the header has been 58 * delivered to the application, the application should request the full 59 * header+body size, and so on. 60 * 61 * For streaming based protocols where the amount of data is not known ahead of 62 * time, requesting a single byte may be sufficient. Note that typically the 63 * read callback will be invoked with _more_ bytes than requested. 64 * 65 * Data is read from the network as one or more chunks to improve performance 66 * and increase flexibility. Because of this model, you must iterate over the 67 * data read _or_ employ the underlying <rdb/rope.h> API. For streaming data 68 * sockets where data may simply be copied to another buffer, use the iterator 69 * API presented here. 70 * 71 * @note The RDB interface requires you to explicitly advance the read 72 * cursor in addition to actually obtaining the data. This is handled within 73 * the iterator interface declared in this file (but of course must be done 74 * manually if employing RDB directly). 75 * 76 * 77 * # Writing 78 * 79 * Writing can be done through a simple lcbio_ctx_put() which simply copies data 80 * to an output buffer, or can be done through more efficient but complex means - 81 * see the lcbio_ctx_wwant() function. 82 * 83 * # Scheduling 84 * 85 * Any I/O **must always be scheduled**. For maximal efficiency the various 86 * read/write functions only set internal flags within the contexts to act 87 * as hints that the application intends to read and write data. However in 88 * order to actually perform these operations, kernel/OS calls are needed. 89 * 90 * To indicate that no subsequent I/O operations will be requested until the next 91 * event loop iteration, _and_ to apply/schedule any existing I/O within the 92 * current iteration, the lcbio_ctx_schedule() function should be called. 93 * 94 * 95 * @addtogroup lcbio-ctx 96 * @{ */ 97 98 typedef struct lcbio_CTX *lcbio_pCTX; 99 100 /** 101 * @brief Handlers for I/O events 102 */ 103 typedef struct { 104 /** Error handler invoked with the context and the received error */ 105 void (*cb_err)(lcbio_pCTX, lcb_error_t); 106 107 /** Read handler invoked with the context and the number of bytes read */ 108 void (*cb_read)(lcbio_pCTX, unsigned total); 109 110 /** Triggered by lcbio_ctx_wwant() */ 111 void (*cb_flush_ready)(lcbio_pCTX); 112 113 /** Triggered when data has been flushed from lcbio_ctx_put_ex() */ 114 void (*cb_flush_done)(lcbio_pCTX, unsigned requested, unsigned nflushed); 115 } lcbio_CTXPROCS; 116 117 /** 118 * Container buffer handle containing a backref to the original context. 119 * @private 120 */ 121 typedef struct { 122 ringbuffer_t rb; 123 lcbio_pCTX parent; 124 } lcbio__EASYRB; 125 126 /** 127 * @brief Context for socket I/O 128 * 129 * The CTX structure represents an ownership of a socket. It provides routines 130 * for reading and writing from and to the socket, as well as associating 131 * application data with the socket. 132 */ 133 typedef struct lcbio_CTX { 134 lcbio_SOCKET *sock; /**< Socket resource */ 135 lcbio_pTABLE io; /**< Cached IO table */ 136 void *data; /**< Associative pointer */ 137 void *event; /**< event pointer for E-model I/O */ 138 lcb_sockdata_t *sd; /**< cached SD for C-model I/O */ 139 lcbio__EASYRB *output; /**< for lcbio_ctx_put() */ 140 lcb_socket_t fd; /**< cached FD for E-model I/O */ 141 char evactive; /**< watcher is active for E-model I/O */ 142 char wwant; /**< flag for lcbio_ctx_put_ex */ 143 char state; /**< internal state */ 144 char entered; /**< inside event handler */ 145 unsigned npending; /**< reference count on pending I/O */ 146 unsigned rdwant; /**< number of remaining bytes to read */ 147 lcb_error_t err; /**< pending error */ 148 rdb_IOROPE ior; /**< for reads */ 149 lcbio_pASYNC as_err; /**< async error handler */ 150 lcbio_CTXPROCS procs; /**< callbacks */ 151 const char *subsys; /**< Informational description of connection */ 152 } lcbio_CTX; 153 154 /**@name Creating and Closing 155 *@{*/ 156 157 /** 158 * Creates a new context object. 159 * 160 * The object remains valid until lcbio_ctx_close() is invoked. 161 * 162 * @param sock the underlying socket object. This function increments the 163 * socket's reference count. 164 * @param data user defined data to associate with context. The data may be 165 * obtained at a later point via lcbio_ctx_data() 166 * 167 * @param procs callback table 168 * @return a new context object. 169 */ 170 lcbio_CTX * 171 lcbio_ctx_new(lcbio_SOCKET *sock, void *data, const lcbio_CTXPROCS *procs); 172 173 174 /** 175 * Callback invoked when the connection is about to be release 176 * @param sock the socket being released. 177 * @param releasable whether the socket may be reused 178 * @param arg an argument passed to the close() function. 179 * 180 * If you wish to reuse the socket (and reusable is true) then the socket's 181 * reference count should be incremented via lcbio_ref(). 182 */ 183 typedef void 184 (*lcbio_CTXCLOSE_cb)(lcbio_SOCKET *sock, int releasable, void *arg); 185 186 /** 187 * @brief Close the context object. 188 * 189 * This will invalidate any pending I/O operations 190 * and subsequent callbacks on the context will not be received. After calling 191 * this function, the pointer will be deemed invalid. 192 * 193 * @param ctx 194 * @param callback a callback to invoke (see above) 195 * @param arg argument passed to the callback 196 */ 197 void 198 lcbio_ctx_close(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb callback, void *arg); 199 200 typedef void (*lcbio_CTXDTOR_cb)(lcbio_pCTX); 201 void 202 lcbio_ctx_close_ex(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *cbarg, 203 lcbio_CTXDTOR_cb dtor, void *dtor_arg); 204 /**@}*/ 205 206 207 /**@name Informational Routines 208 * @{*/ 209 210 /** 211 * Get the data associated with the context. This is the pointer specified 212 * during the constructor 213 */ 214 #define lcbio_ctx_data(ctx) (ctx)->data 215 216 /** Get the associated lcbio_SOCKET object */ 217 #define lcbio_ctx_sock(ctx) (ctx)->sock 218 219 /** Dump a textual representation of the context to the screen */ 220 void 221 lcbio_ctx_dump(lcbio_CTX *ctx, FILE *fp); 222 223 /**@}*/ 224 225 /** Asynchronously trigger the error callback */ 226 void 227 lcbio_ctx_senderr(lcbio_CTX *ctx, lcb_error_t err); 228 229 230 /** 231 * Schedule any pending I/O to be scheduled immediately. If data was requested 232 * via lcbio_ctx_rwant() then a request will be sent for reading. If data was 233 * requested to be flushed either via lcbio_ctx_put() or lcbio_ctx_wwant() 234 * then those will be scheduled as well. 235 * 236 * This call is a no-op if invoked from within the current handler, as this 237 * function is invoked implicitly after the I/O handler itself has returned. 238 * 239 * It is safe (though typically not efficient) to invoke this function 240 * multiple times. Each invocation may potentially involve system calls 241 * and buffer allocations, depending on the I/O plugin being used. 242 */ 243 void 244 lcbio_ctx_schedule(lcbio_CTX *ctx); 245 246 /** 247 * @brief Add output data to the write buffer. 248 * 249 * The data added is copied to an internal buffer and flushed to the network 250 * when appropriate. If you wish to have more control over how the data is written 251 * then see the lcbio_ctx_wwant() function. 252 * 253 * @param ctx 254 * @param buf the buffer to write 255 * @param nbuf the size of the buffer to write 256 */ 257 void 258 lcbio_ctx_put(lcbio_CTX *ctx, const void *buf, unsigned nbuf); 259 260 /** 261 * Invoke the lcbio_CTXPROCS#cb_flush_ready() 262 * callback when a flush may be invoked. Note that the 263 * callback may be invoked from within this function itself, or 264 * it may be invoked at some point later. 265 * 266 * In order to ensure that the callback is actually invoked (in 267 * cases where it is not invoked immediately), call lcbio_ctx_schedule() 268 * before returning to the loop. 269 * 270 * When the callback is invoked, you should call the lcbio_ctx_put_ex() function 271 * to actually enqueue the data to be written. The lcbio_ctx_put_ex() function 272 * should be called multiple times until either no write buffers remain 273 * or the function itself returns a false value. The lcbio_ctx_put_ex() function 274 * may either flush the data immediately or schedule for the data to be flushed 275 * depending on the I/O implementation and network conditions. 276 * 277 * Once the data is actually flushed to the socket's buffers, the 278 * lcbio_CTXPROCS#cb_flush_done() callback is invoked. 279 * This callback indicates the underlying buffers are no longer required and may 280 * be released or reused by the application. Note that the IOV array passed 281 * into lcbio_ctx_put_ex() is always _Conceptually_ copied (i.e. 282 * this may be a stack-based structure which does not need to remain valid 283 * outside the function call to lcbio_ctx_put_ex() itself). 284 * 285 * Additionally, note that the number of bytes flushed within the 286 * lcbio_CTXPROCS#cb_flush_done() 287 * callback may not equal the number of bytes initially placed inside the IOVs 288 * (i.e. it may be less). In this case the application is expected to update 289 * the IOV structures and the origin buffers appropriately. 290 * 291 * This model allows for efficient handling in both completion and event based 292 * environments. 293 * 294 * ### Implementation Notes 295 * 296 * For completion-based models, the lcbio_CTXPROCS#cb_flush_ready() 297 * callback is invoked immediately from the wwant() function, while the 298 * flush_done() is dependent on the actual completion of the write. 299 * 300 * For event-based models, the wwant flag is set inside the context and is then 301 * checked by the lcbio_ctx_schedule() function. When the event handler is invoked, the 302 * flush_ready() callback is invoked as well - typically in a loop until an 303 * `EWOULDBLOCK` is received on the socket itself. 304 */ 305 void 306 lcbio_ctx_wwant(lcbio_CTX *ctx); 307 308 /** 309 * @brief Flush data from the lcbio_CTXPROCS#cb_flush_ready() callback 310 * 311 * This function is intended to be called from within the `cb_flush_ready` 312 * handler (see lcbio_ctx_wwant()). 313 * 314 * @param ctx 315 * @param iov the IOV array. The IOV array may point to a stack-based array 316 * @param niov number of elements in the array 317 * @param nb The total number of bytes described by all the elements of the 318 * array. 319 * 320 * @return nonzero if more data can be written (i.e. this function may be 321 * invoked again), zero otherwise. 322 */ 323 int 324 lcbio_ctx_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb); 325 326 /** 327 * Require that the read callback not be invoked until at least `n` 328 * bytes are available within the buffer. 329 * 330 * @param ctx 331 * @param n the number of bytes required to be in the buffer before the 332 * callback should be invoked. 333 * 334 * @note 335 * Note that this flag does _not_ maintain state between successive callbacks. 336 * You must call this function each time you need more data as it is cleared 337 * before the invocation into the callback. 338 * 339 * @note 340 * This function sets the number of **total** bytes 341 * which must be available in the buffer before the callback is invoked. Thus 342 * you should set this to the total number of bytes needed, and **not** the 343 * number of remaining bytes that should be read. 344 */ 345 void 346 lcbio_ctx_rwant(lcbio_CTX *ctx, unsigned n); 347 348 349 /** @private */ 350 typedef struct { 351 unsigned remaining; 352 void *buf; 353 unsigned nbuf; 354 } lcbio_CTXRDITER; 355 356 /** 357 * Iterate over the read buffers 358 * 359 * @code{.c} 360 * static void read_callback(lcbio_CTX *ctx, void *arg, unsigned nb) { 361 * lcbio_CTXRDITER iter; 362 * LCBIO_CTX_ITERFOR(ctx, &iter, nb) { 363 * void *buf = lcbio_ctx_ribuf(&iter); 364 * unsigned nbuf = lcbio_ctx_risize(&iter); 365 * // do stuff with the buffer 366 * } 367 * } 368 * @endcode 369 * 370 * When each iteration is complete, the pointer returned by ctx_ribuf is 371 * no longer valid. 372 * 373 * @param ctx the context which contains the buffer 374 * @param[in,out] iter an empty iterator 375 * @param[in] nb the number of bytes to iterate over. 376 */ 377 #define LCBIO_CTX_ITERFOR(ctx, iter, nb) \ 378 for (lcbio_ctx_ristart(ctx, iter, nb); !lcbio_ctx_ridone(iter); \ 379 lcbio_ctx_rinext(ctx, iter)) 380 381 /** Obtains the buffer from the current iterator */ 382 #define lcbio_ctx_ribuf(iter) ((iter)->buf) 383 384 /** Obtains the length of the buffer from the current iterator */ 385 #define lcbio_ctx_risize(iter) ((iter)->nbuf) 386 387 void 388 lcbio_ctx_ristart(lcbio_CTX *ctx, lcbio_CTXRDITER *iter, unsigned nb); 389 390 void 391 lcbio_ctx_rinext(lcbio_CTX *ctx, lcbio_CTXRDITER *iter); 392 393 #define lcbio_ctx_ridone(iter) (!(iter)->remaining) 394 395 #define LCBIO_CTX_RSCHEDULE(ctx, nb) do { \ 396 lcbio_ctx_rwant(ctx, nb); \ 397 lcbio_ctx_schedule(ctx); \ 398 } while (0) 399 400 #ifdef __cplusplus 401 } 402 #endif 403 #endif 404 405 /** @} */ 406