1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef LCBIO_CTXEASY_H
19 #define LCBIO_CTXEASY_H
20 #include "connect.h"
21 #include "rdb/rope.h"
22 #include "ringbuffer.h"
23 
24 #ifdef __cplusplus
25 extern "C" {
26 #endif
27 
28 /**
29  * @file
30  * This file contains routines for reading and writing from and to a socket
31  */
32 
33 /**
34  * @ingroup lcbio
35  * @defgroup lcbio-ctx Reading/Writing Routines
36  *
37  * @details
38  *
39  * # Attaching
40  *
41  * A context is first _attached_ to a socket and a _user-defined_ data object.
42  * The idea is that the context is the broker which schedules I/O on behalf
43  * of the application to the socket, and then receives events from the socket,
44  * passing it along to user-defined data.
45  *
46  * To create a new context, invoke the lcbio_ctx_new() function. When you are done
47  * with it call the lcbio_ctx_close() function.
48  *
49  * # Reading
50  *
51  * Reading data is done through first requesting an amount of data to read
52  * and then reading the data from the buffers when the lcbio_CTXPROCS#cb_read
53  * handler is invoked.
54  *
55  * Requesting an amount of data to be read may be dependent on some current
56  * parsing context. In cases where the expected message size is known the pattern
57  * is to initially request the size of the header; once the header has been
58  * delivered to the application, the application should request the full
59  * header+body size, and so on.
60  *
61  * For streaming based protocols where the amount of data is not known ahead of
62  * time, requesting a single byte may be sufficient. Note that typically the
63  * read callback will be invoked with _more_ bytes than requested.
64  *
65  * Data is read from the network as one or more chunks to improve performance
66  * and increase flexibility. Because of this model, you must iterate over the
67  * data read _or_ employ the underlying <rdb/rope.h> API. For streaming data
68  * sockets where data may simply be copied to another buffer, use the iterator
69  * API presented here.
70  *
71  * @note The RDB interface requires you to explicitly advance the read
72  * cursor in addition to actually obtaining the data. This is handled within
73  * the iterator interface declared in this file (but of course must be done
74  * manually if employing RDB directly).
75  *
76  *
77  * # Writing
78  *
79  * Writing can be done through a simple lcbio_ctx_put() which simply copies data
80  * to an output buffer, or can be done through more efficient but complex means -
81  * see the lcbio_ctx_wwant() function.
82  *
83  * # Scheduling
84  *
85  * Any I/O **must always be scheduled**. For maximal efficiency the various
86  * read/write functions only set internal flags within the contexts to act
87  * as hints that the application intends to read and write data. However in
88  * order to actually perform these operations, kernel/OS calls are needed.
89  *
90  * To indicate that no subsequent I/O operations will be requested until the next
91  * event loop iteration, _and_ to apply/schedule any existing I/O within the
92  * current iteration, the lcbio_ctx_schedule() function should be called.
93  *
94  *
95  * @addtogroup lcbio-ctx
96  * @{ */
97 
98 typedef struct lcbio_CTX *lcbio_pCTX;
99 
100 /**
101  * @brief Handlers for I/O events
102  */
103 typedef struct {
104     /** Error handler invoked with the context and the received error */
105     void (*cb_err)(lcbio_pCTX, lcb_error_t);
106 
107     /** Read handler invoked with the context and the number of bytes read */
108     void (*cb_read)(lcbio_pCTX, unsigned total);
109 
110     /** Triggered by lcbio_ctx_wwant() */
111     void (*cb_flush_ready)(lcbio_pCTX);
112 
113     /** Triggered when data has been flushed from lcbio_ctx_put_ex() */
114     void (*cb_flush_done)(lcbio_pCTX, unsigned requested, unsigned nflushed);
115 } lcbio_CTXPROCS;
116 
117 /**
118  * Container buffer handle containing a backref to the original context.
119  * @private
120  */
121 typedef struct {
122     ringbuffer_t rb;
123     lcbio_pCTX parent;
124 } lcbio__EASYRB;
125 
126 /**
127  * @brief Context for socket I/O
128  *
129  * The CTX structure represents an ownership of a socket. It provides routines
130  * for reading and writing from and to the socket, as well as associating
131  * application data with the socket.
132  */
133 typedef struct lcbio_CTX {
134     lcbio_SOCKET *sock; /**< Socket resource */
135     lcbio_pTABLE io; /**< Cached IO table */
136     void *data; /**< Associative pointer */
137     void *event; /**< event pointer for E-model I/O */
138     lcb_sockdata_t *sd; /**< cached SD for C-model I/O */
139     lcbio__EASYRB *output; /**< for lcbio_ctx_put() */
140     lcb_socket_t fd; /**< cached FD for E-model I/O */
141     char evactive; /**< watcher is active for E-model I/O */
142     char wwant; /**< flag for lcbio_ctx_put_ex */
143     char state; /**< internal state */
144     char entered; /**< inside event handler */
145     unsigned npending; /**< reference count on pending I/O */
146     unsigned rdwant; /**< number of remaining bytes to read */
147     lcb_error_t err; /**< pending error */
148     rdb_IOROPE ior; /**< for reads */
149     lcbio_pASYNC as_err; /**< async error handler */
150     lcbio_CTXPROCS procs; /**< callbacks */
151     const char *subsys; /**< Informational description of connection */
152 } lcbio_CTX;
153 
154 /**@name Creating and Closing
155  *@{*/
156 
157 /**
158  * Creates a new context object.
159  *
160  * The object remains valid until lcbio_ctx_close() is invoked.
161  *
162  * @param sock the underlying socket object. This function increments the
163  *        socket's reference count.
164  * @param data user defined data to associate with context. The data may be
165  *        obtained at a later point via lcbio_ctx_data()
166  *
167  * @param procs callback table
168  * @return a new context object.
169  */
170 lcbio_CTX *
171 lcbio_ctx_new(lcbio_SOCKET *sock, void *data, const lcbio_CTXPROCS *procs);
172 
173 
174 /**
175  * Callback invoked when the connection is about to be release
176  * @param sock the socket being released.
177  * @param releasable whether the socket may be reused
178  * @param arg an argument passed to the close() function.
179  *
180  * If you wish to reuse the socket (and reusable is true) then the socket's
181  * reference count should be incremented via lcbio_ref().
182  */
183 typedef void
184 (*lcbio_CTXCLOSE_cb)(lcbio_SOCKET *sock, int releasable, void *arg);
185 
186 /**
187  * @brief Close the context object.
188  *
189  * This will invalidate any pending I/O operations
190  * and subsequent callbacks on the context will not be received. After calling
191  * this function, the pointer will be deemed invalid.
192  *
193  * @param ctx
194  * @param callback a callback to invoke (see above)
195  * @param arg argument passed to the callback
196  */
197 void
198 lcbio_ctx_close(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb callback, void *arg);
199 
200 typedef void (*lcbio_CTXDTOR_cb)(lcbio_pCTX);
201 void
202 lcbio_ctx_close_ex(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *cbarg,
203                    lcbio_CTXDTOR_cb dtor, void *dtor_arg);
204 /**@}*/
205 
206 
207 /**@name Informational Routines
208  * @{*/
209 
210 /**
211  * Get the data associated with the context. This is the pointer specified
212  * during the constructor
213  */
214 #define lcbio_ctx_data(ctx) (ctx)->data
215 
216 /** Get the associated lcbio_SOCKET object */
217 #define lcbio_ctx_sock(ctx) (ctx)->sock
218 
219 /** Dump a textual representation of the context to the screen */
220 void
221 lcbio_ctx_dump(lcbio_CTX *ctx, FILE *fp);
222 
223 /**@}*/
224 
225 /** Asynchronously trigger the error callback */
226 void
227 lcbio_ctx_senderr(lcbio_CTX *ctx, lcb_error_t err);
228 
229 
230 /**
231  * Schedule any pending I/O to be scheduled immediately. If data was requested
232  * via lcbio_ctx_rwant() then a request will be sent for reading. If data was
233  * requested to be flushed either via lcbio_ctx_put() or lcbio_ctx_wwant()
234  * then those will be scheduled as well.
235  *
236  * This call is a no-op if invoked from within the current handler, as this
237  * function is invoked implicitly after the I/O handler itself has returned.
238  *
239  * It is safe (though typically not efficient) to invoke this function
240  * multiple times. Each invocation may potentially involve system calls
241  * and buffer allocations, depending on the I/O plugin being used.
242  */
243 void
244 lcbio_ctx_schedule(lcbio_CTX *ctx);
245 
246 /**
247  * @brief Add output data to the write buffer.
248  *
249  * The data added is copied to an internal buffer and flushed to the network
250  * when appropriate. If you wish to have more control over how the data is written
251  * then see the lcbio_ctx_wwant() function.
252  *
253  * @param ctx
254  * @param buf the buffer to write
255  * @param nbuf the size of the buffer to write
256  */
257 void
258 lcbio_ctx_put(lcbio_CTX *ctx, const void *buf, unsigned nbuf);
259 
260 /**
261  * Invoke the lcbio_CTXPROCS#cb_flush_ready()
262  * callback when a flush may be invoked. Note that the
263  * callback may be invoked from within this function itself, or
264  * it may be invoked at some point later.
265  *
266  * In order to ensure that the callback is actually invoked (in
267  * cases where it is not invoked immediately), call lcbio_ctx_schedule()
268  * before returning to the loop.
269  *
270  * When the callback is invoked, you should call the lcbio_ctx_put_ex() function
271  * to actually enqueue the data to be written. The lcbio_ctx_put_ex() function
272  * should be called multiple times until either no write buffers remain
273  * or the function itself returns a false value. The lcbio_ctx_put_ex() function
274  * may either flush the data immediately or schedule for the data to be flushed
275  * depending on the I/O implementation and network conditions.
276  *
277  * Once the data is actually flushed to the socket's buffers, the
278  * lcbio_CTXPROCS#cb_flush_done() callback is invoked.
279  * This callback indicates the underlying buffers are no longer required and may
280  * be released or reused by the application. Note that the IOV array passed
281  * into lcbio_ctx_put_ex() is always _Conceptually_ copied (i.e.
282  * this may be a stack-based structure which does not need to remain valid
283  * outside the function call to lcbio_ctx_put_ex() itself).
284  *
285  * Additionally, note that the number of bytes flushed within the
286  * lcbio_CTXPROCS#cb_flush_done()
287  * callback may not equal the number of bytes initially placed inside the IOVs
288  * (i.e. it may be less). In this case the application is expected to update
289  * the IOV structures and the origin buffers appropriately.
290  *
291  * This model allows for efficient handling in both completion and event based
292  * environments.
293  *
294  * ### Implementation Notes
295  *
296  * For completion-based models, the lcbio_CTXPROCS#cb_flush_ready()
297  * callback is invoked immediately from the wwant() function, while the
298  * flush_done() is dependent on the actual completion of the write.
299  *
300  * For event-based models, the wwant flag is set inside the context and is then
301  * checked by the lcbio_ctx_schedule() function. When the event handler is invoked, the
302  * flush_ready() callback is invoked as well - typically in a loop until an
303  * `EWOULDBLOCK` is received on the socket itself.
304  */
305 void
306 lcbio_ctx_wwant(lcbio_CTX *ctx);
307 
308 /**
309  * @brief Flush data from the lcbio_CTXPROCS#cb_flush_ready() callback
310  *
311  * This function is intended to be called from within the `cb_flush_ready`
312  * handler (see lcbio_ctx_wwant()).
313  *
314  * @param ctx
315  * @param iov the IOV array. The IOV array may point to a stack-based array
316  * @param niov number of elements in the array
317  * @param nb The total number of bytes described by all the elements of the
318  * array.
319  *
320  * @return nonzero if more data can be written (i.e. this function may be
321  * invoked again), zero otherwise.
322  */
323 int
324 lcbio_ctx_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb);
325 
326 /**
327  * Require that the read callback not be invoked until at least `n`
328  * bytes are available within the buffer.
329  *
330  * @param ctx
331  * @param n the number of bytes required to be in the buffer before the
332  *        callback should be invoked.
333  *
334  * @note
335  * Note that this flag does _not_ maintain state between successive callbacks.
336  * You must call this function each time you need more data as it is cleared
337  * before the invocation into the callback.
338  *
339  * @note
340  * This function sets the number of **total** bytes
341  * which must be available in the buffer before the callback is invoked. Thus
342  * you should set this to the total number of bytes needed, and **not** the
343  * number of remaining bytes that should be read.
344  */
345 void
346 lcbio_ctx_rwant(lcbio_CTX *ctx, unsigned n);
347 
348 
349 /** @private */
350 typedef struct {
351     unsigned remaining;
352     void *buf;
353     unsigned nbuf;
354 } lcbio_CTXRDITER;
355 
356 /**
357  * Iterate over the read buffers
358  *
359  * @code{.c}
360  * static void read_callback(lcbio_CTX *ctx, void *arg, unsigned nb) {
361  *     lcbio_CTXRDITER iter;
362  *     LCBIO_CTX_ITERFOR(ctx, &iter, nb) {
363  *         void *buf = lcbio_ctx_ribuf(&iter);
364  *         unsigned nbuf = lcbio_ctx_risize(&iter);
365  *          // do stuff with the buffer
366  *     }
367  * }
368  * @endcode
369  *
370  * When each iteration is complete, the pointer returned by ctx_ribuf is
371  * no longer valid.
372  *
373  * @param ctx the context which contains the buffer
374  * @param[in,out] iter an empty iterator
375  * @param[in] nb the number of bytes to iterate over.
376  */
377 #define LCBIO_CTX_ITERFOR(ctx, iter, nb) \
378     for (lcbio_ctx_ristart(ctx, iter, nb); !lcbio_ctx_ridone(iter); \
379     lcbio_ctx_rinext(ctx, iter))
380 
381 /** Obtains the buffer from the current iterator */
382 #define lcbio_ctx_ribuf(iter) ((iter)->buf)
383 
384 /** Obtains the length of the buffer from the current iterator */
385 #define lcbio_ctx_risize(iter) ((iter)->nbuf)
386 
387 void
388 lcbio_ctx_ristart(lcbio_CTX *ctx, lcbio_CTXRDITER *iter, unsigned nb);
389 
390 void
391 lcbio_ctx_rinext(lcbio_CTX *ctx, lcbio_CTXRDITER *iter);
392 
393 #define lcbio_ctx_ridone(iter) (!(iter)->remaining)
394 
395 #define LCBIO_CTX_RSCHEDULE(ctx, nb) do { \
396     lcbio_ctx_rwant(ctx, nb); \
397     lcbio_ctx_schedule(ctx); \
398 } while (0)
399 
400 #ifdef __cplusplus
401 }
402 #endif
403 #endif
404 
405 /** @} */
406