1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2014 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "ctx.h"
19 #include "iotable.h"
20 #include "timer-ng.h"
21 #include "ioutils.h"
22 #include <stdio.h>
23 #include <lcbio/ssl.h>
24 
25 #define CTX_FD(ctx) (ctx)->fd
26 #define CTX_SD(ctx) (ctx)->sd
27 #define CTX_IOT(ctx) (ctx)->io
28 #define CTX_INCR_METRIC(ctx, metric, n) do { \
29     if (ctx->sock && ctx->sock->metrics) { \
30         ctx->sock->metrics->metric += n; \
31     } \
32 } while (0)
33 
34 
35 #define LOGARGS(c, lvl) (c)->sock->settings, "ioctx", LCB_LOG_##lvl, __FILE__, __LINE__
36 
37 #include "rw-inl.h"
38 
39 typedef enum {
40     ES_ACTIVE = 0,
41     ES_DETACHED
42 } easy_state;
43 
44 static void
err_handler(void * cookie)45 err_handler(void *cookie)
46 {
47     lcbio_CTX *ctx = (void *)cookie;
48     ctx->procs.cb_err(ctx, ctx->err);
49 }
50 
51 static lcb_error_t
convert_lcberr(const lcbio_CTX * ctx,lcbio_IOSTATUS status)52 convert_lcberr(const lcbio_CTX *ctx, lcbio_IOSTATUS status)
53 {
54     const lcb_settings *settings = ctx->sock->settings;
55     lcbio_OSERR oserr = IOT_ERRNO(ctx->sock->io);
56 
57     if (lcbio_ssl_check(ctx->sock)) {
58         lcb_error_t err = lcbio_ssl_get_error(ctx->sock);
59         if (err) {
60             return err;
61         }
62     }
63 
64     if (status == LCBIO_SHUTDOWN) {
65         return lcbio_mklcberr(0, settings);
66     } else if (oserr != 0) {
67         return lcbio_mklcberr(oserr, settings);
68     } else {
69         return LCB_NETWORK_ERROR;
70     }
71 }
72 
73 lcbio_CTX *
lcbio_ctx_new(lcbio_SOCKET * sock,void * data,const lcbio_CTXPROCS * procs)74 lcbio_ctx_new(lcbio_SOCKET *sock, void *data, const lcbio_CTXPROCS *procs)
75 {
76     lcbio_CTX *ctx = calloc(1, sizeof(*ctx));
77     ctx->sock = sock;
78     sock->ctx = ctx;
79     ctx->io = sock->io;
80     ctx->data = data;
81     ctx->procs = *procs;
82     ctx->state = ES_ACTIVE;
83     ctx->as_err = lcbio_timer_new(ctx->io, ctx, err_handler);
84     ctx->subsys = "unknown";
85     sock->service = LCBIO_SERVICE_UNSPEC;
86     sock->atime = LCB_NS2US(gethrtime());
87 
88     rdb_init(&ctx->ior, sock->settings->allocator_factory());
89     lcbio_ref(sock);
90 
91     if (IOT_IS_EVENT(ctx->io)) {
92         ctx->event = IOT_V0EV(ctx->io).create(IOT_ARG(ctx->io));
93         ctx->fd = sock->u.fd;
94     } else {
95         ctx->sd = sock->u.sd;
96     }
97 
98     ctx->procs = *procs;
99     ctx->state = ES_ACTIVE;
100 
101     lcb_log(LOGARGS(ctx, DEBUG), CTX_LOGFMT "Pairing with SOCK=%016" PRIx64, CTX_LOGID(ctx), sock->id);
102     return ctx;
103 }
104 
105 static void
free_ctx(lcbio_CTX * ctx)106 free_ctx(lcbio_CTX *ctx)
107 {
108     rdb_cleanup(&ctx->ior);
109     lcbio_unref(ctx->sock);
110     if (ctx->output) {
111         ringbuffer_destruct(&ctx->output->rb);
112         free(ctx->output);
113     }
114     if (ctx->procs.cb_flush_ready) {
115         /* dtor */
116         ctx->procs.cb_flush_ready(ctx);
117     }
118     free(ctx);
119 }
120 
121 static void
deactivate_watcher(lcbio_CTX * ctx)122 deactivate_watcher(lcbio_CTX *ctx)
123 {
124     if (ctx->evactive && ctx->event) {
125         IOT_V0EV(CTX_IOT(ctx)).cancel(
126                 IOT_ARG(CTX_IOT(ctx)), CTX_FD(ctx), ctx->event);
127         ctx->evactive = 0;
128     }
129 }
130 
131 void
lcbio_ctx_close_ex(lcbio_CTX * ctx,lcbio_CTXCLOSE_cb cb,void * arg,lcbio_CTXDTOR_cb dtor,void * dtor_arg)132 lcbio_ctx_close_ex(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *arg,
133                    lcbio_CTXDTOR_cb dtor, void *dtor_arg)
134 {
135     unsigned oldrc;
136     ctx->state = ES_DETACHED;
137     lcb_assert(ctx->sock);
138 
139     if (ctx->event) {
140         deactivate_watcher(ctx);
141         IOT_V0EV(CTX_IOT(ctx)).destroy(IOT_ARG(CTX_IOT(ctx)), ctx->event);
142         ctx->event = NULL;
143     }
144 
145     if (ctx->as_err) {
146         lcbio_timer_destroy(ctx->as_err);
147         ctx->as_err = NULL;
148     }
149 
150     oldrc = ctx->sock->refcount;
151     lcb_log(LOGARGS(ctx, DEBUG), CTX_LOGFMT "Destroying context. Pending Writes=%d, Entered=%s, Socket Refcount=%d", CTX_LOGID(ctx), (int)ctx->npending, (int)ctx->entered ? "true": "false", oldrc);
152 
153     if (cb) {
154         int reusable =
155                 ctx->npending == 0 && /* no pending events */
156                 ctx->err == LCB_SUCCESS && /* no socket errors */
157                 ctx->rdwant == 0 && /* no expected input */
158                 ctx->wwant == 0 && /* no expected output */
159                 (ctx->output == NULL || ctx->output->rb.nbytes == 0);
160         cb(ctx->sock, reusable, arg);
161     }
162 
163     ctx->sock->ctx = NULL;
164     if (oldrc == ctx->sock->refcount) {
165         lcbio_unref(ctx->sock);
166     }
167 
168     if (ctx->output) {
169         ringbuffer_destruct(&ctx->output->rb);
170         free(ctx->output);
171         ctx->output = NULL;
172     }
173 
174     ctx->fd = INVALID_SOCKET;
175     ctx->sd = NULL;
176 
177     if (dtor) {
178         ctx->data = dtor_arg;
179         ctx->procs.cb_flush_ready = dtor;
180 
181     } else {
182         ctx->procs.cb_flush_ready = NULL;
183     }
184     ctx->procs.cb_read = NULL;
185 
186     if (ctx->npending == 0 && ctx->entered == 0) {
187         free_ctx(ctx);
188     }
189 }
190 
191 void
lcbio_ctx_close(lcbio_CTX * ctx,lcbio_CTXCLOSE_cb cb,void * arg)192 lcbio_ctx_close(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *arg)
193 {
194     lcbio_ctx_close_ex(ctx, cb, arg, NULL, NULL);
195 }
196 
197 void
lcbio_ctx_put(lcbio_CTX * ctx,const void * buf,unsigned nbuf)198 lcbio_ctx_put(lcbio_CTX *ctx, const void *buf, unsigned nbuf)
199 {
200     lcbio__EASYRB *erb = ctx->output;
201 
202     if (!erb) {
203         ctx->output = erb = calloc(1, sizeof(*ctx->output));
204 
205         if (!erb) {
206             lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
207             return;
208         }
209 
210         erb->parent = ctx;
211 
212         if (!ringbuffer_initialize(&erb->rb, nbuf)) {
213             lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
214             return;
215         }
216     }
217 
218     if (!ringbuffer_ensure_capacity(&erb->rb, nbuf)) {
219         lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
220         return;
221     }
222 
223     ringbuffer_write(&erb->rb, buf, nbuf);
224 }
225 
226 void
lcbio_ctx_rwant(lcbio_CTX * ctx,unsigned n)227 lcbio_ctx_rwant(lcbio_CTX *ctx, unsigned n)
228 {
229     ctx->rdwant = n;
230 }
231 
232 static void
set_iterbuf(lcbio_CTX * ctx,lcbio_CTXRDITER * iter)233 set_iterbuf(lcbio_CTX *ctx, lcbio_CTXRDITER *iter)
234 {
235     if ((iter->nbuf = rdb_get_contigsize(&ctx->ior))) {
236         if (iter->nbuf > iter->remaining) {
237             iter->nbuf = iter->remaining;
238         }
239         iter->buf = rdb_get_consolidated(&ctx->ior, iter->nbuf);
240     } else {
241         iter->buf = NULL;
242     }
243 }
244 
245 void
lcbio_ctx_ristart(lcbio_CTX * ctx,lcbio_CTXRDITER * iter,unsigned nb)246 lcbio_ctx_ristart(lcbio_CTX *ctx, lcbio_CTXRDITER *iter, unsigned nb)
247 {
248     iter->remaining = nb;
249     set_iterbuf(ctx, iter);
250 }
251 
252 void
lcbio_ctx_rinext(lcbio_CTX * ctx,lcbio_CTXRDITER * iter)253 lcbio_ctx_rinext(lcbio_CTX *ctx, lcbio_CTXRDITER *iter)
254 {
255     rdb_consumed(&ctx->ior, iter->nbuf);
256     iter->remaining -= iter->nbuf;
257     set_iterbuf(ctx, iter);
258 }
259 
260 static int
E_free_detached(lcbio_CTX * ctx)261 E_free_detached(lcbio_CTX *ctx)
262 {
263     if (ctx->state == ES_DETACHED) {
264         free_ctx(ctx);
265         return 1;
266     }
267     return 0;
268 }
269 
270 static void
invoke_read_cb(lcbio_CTX * ctx,unsigned nb)271 invoke_read_cb(lcbio_CTX *ctx, unsigned nb)
272 {
273     ctx->rdwant = 0;
274     if (ctx->procs.cb_read == NULL) {
275         return;
276     }
277     ctx->entered++;
278     ctx->procs.cb_read(ctx, nb);
279     ctx->entered--;
280 }
281 
282 static void
send_io_error(lcbio_CTX * ctx,lcbio_IOSTATUS status)283 send_io_error(lcbio_CTX *ctx, lcbio_IOSTATUS status)
284 {
285     lcb_error_t rc = convert_lcberr(ctx, status);
286     CTX_INCR_METRIC(ctx, io_error, 1);
287     if (status == LCBIO_SHUTDOWN) {
288         CTX_INCR_METRIC(ctx, io_close, 1);
289     }
290     lcbio_ctx_senderr(ctx, rc);
291 }
292 
293 static void
E_handler(lcb_socket_t sock,short which,void * arg)294 E_handler(lcb_socket_t sock, short which, void *arg)
295 {
296     lcbio_CTX *ctx = arg;
297     lcbio_IOSTATUS status;
298     (void)sock;
299 
300     if (which & LCB_READ_EVENT) {
301         unsigned nb;
302         status = lcbio_E_rdb_slurp(ctx, &ctx->ior);
303         nb = rdb_get_nused(&ctx->ior);
304 
305         ctx->sock->atime = LCB_NS2US(gethrtime());
306         if (nb >= ctx->rdwant) {
307             invoke_read_cb(ctx, nb);
308             if (E_free_detached(ctx)) {
309                 return;
310             }
311         }
312         if (!LCBIO_IS_OK(status)) {
313             send_io_error(ctx, status);
314             return;
315         }
316         CTX_INCR_METRIC(ctx, bytes_received, nb);
317     }
318 
319     if (which & LCB_WRITE_EVENT) {
320         if (ctx->wwant) {
321             ctx->wwant = 0;
322             ctx->procs.cb_flush_ready(ctx);
323             if (ctx->err) {
324                 return;
325             }
326         } else if (ctx->output) {
327             status = lcbio_E_rb_write(ctx, &ctx->output->rb);
328             /** Metrics are logged by E_rb_write */
329             if (!LCBIO_IS_OK(status)) {
330                 send_io_error(ctx, status);
331                 return;
332             }
333         }
334     }
335 
336     lcbio_ctx_schedule(ctx);
337 }
338 
339 static void
invoke_entered_errcb(lcbio_CTX * ctx,lcb_error_t err)340 invoke_entered_errcb(lcbio_CTX *ctx, lcb_error_t err)
341 {
342     ctx->err = err;
343     ctx->entered++;
344     ctx->procs.cb_err(ctx, err);
345     ctx->entered--;
346 }
347 
348 static void
Cw_handler(lcb_sockdata_t * sd,int status,void * arg)349 Cw_handler(lcb_sockdata_t *sd, int status, void *arg)
350 {
351     lcbio__EASYRB *erb = arg;
352     lcbio_CTX *ctx = erb->parent;
353     (void)sd;
354 
355     ctx->npending--;
356     CTX_INCR_METRIC(ctx, bytes_sent, erb->rb.nbytes);
357 
358     if (!ctx->output) {
359         ctx->output = erb;
360         ringbuffer_reset(&erb->rb);
361 
362     } else {
363         ringbuffer_destruct(&erb->rb);
364         free(erb);
365     }
366 
367     if (ctx->state == ES_ACTIVE && status) {
368         invoke_entered_errcb(ctx, convert_lcberr(ctx, LCBIO_IOERR));
369     }
370 
371     if (ctx->state != ES_ACTIVE && ctx->npending == 0) {
372         free_ctx(ctx);
373     }
374 }
375 
376 static void C_schedule(lcbio_CTX *ctx);
377 
378 static void
Cr_handler(lcb_sockdata_t * sd,lcb_ssize_t nr,void * arg)379 Cr_handler(lcb_sockdata_t *sd, lcb_ssize_t nr, void *arg)
380 {
381     lcbio_CTX *ctx = arg;
382     sd->is_reading = 0;
383     ctx->npending--;
384 
385     if (ctx->state == ES_ACTIVE) {
386         ctx->sock->atime = LCB_NS2US(gethrtime());
387         if (nr > 0) {
388             unsigned total;
389             rdb_rdend(&ctx->ior, nr);
390             total = rdb_get_nused(&ctx->ior);
391             if (total >= ctx->rdwant) {
392 #ifdef LCB_DUMP_PACKETS
393                 {
394                     char *b64 = NULL;
395                     lcb_SIZE nb64 = 0;
396                     char *buf = calloc(total, sizeof(char));
397                     rdb_copyread(&ctx->ior, buf, total);
398                     lcb_base64_encode2(buf, total, &b64, &nb64);
399                     lcb_log(LOGARGS(ctx, TRACE), CTX_LOGFMT "pkt,rcv: size=%d, %.*s", CTX_LOGID(ctx), (int)nb64, (int)nb64, b64);
400                     free(b64);
401                     free(buf);
402                 }
403 #endif
404                 invoke_read_cb(ctx, total);
405             }
406             CTX_INCR_METRIC(ctx, bytes_received, total);
407             lcbio_ctx_schedule(ctx);
408         } else {
409             lcbio_IOSTATUS iostatus;
410             lcb_error_t err;
411 
412             CTX_INCR_METRIC(ctx, io_error, 1);
413             if (nr) {
414                 iostatus = LCBIO_IOERR;
415             } else {
416                 iostatus = LCBIO_SHUTDOWN;
417                 CTX_INCR_METRIC(ctx, io_close, 1);
418             }
419 
420             err = convert_lcberr(ctx, iostatus);
421             ctx->rdwant = 0;
422             invoke_entered_errcb(ctx, err);
423         }
424     }
425 
426     if (ctx->state != ES_ACTIVE && ctx->npending == 0) {
427         free_ctx(ctx);
428     }
429 }
430 
431 static void
C_schedule(lcbio_CTX * ctx)432 C_schedule(lcbio_CTX *ctx)
433 {
434     lcbio_TABLE *io = ctx->io;
435     lcb_sockdata_t *sd = CTX_SD(ctx);
436     int rv;
437 
438     if (ctx->output && ctx->output->rb.nbytes) {
439         /** Schedule a write */
440         lcb_IOV iov[2] = {0};
441         unsigned niov;
442 
443         ringbuffer_get_iov(&ctx->output->rb, RINGBUFFER_READ, iov);
444         niov = iov[1].iov_len ? 2 : 1;
445         rv = IOT_V1(io).write2(IOT_ARG(io), sd, iov, niov, ctx->output, Cw_handler);
446         if (rv) {
447             send_io_error(ctx, LCBIO_IOERR);
448             return;
449         } else {
450             ctx->output = NULL;
451             ctx->npending++;
452 #ifdef LCB_DUMP_PACKETS
453             {
454                 char *b64 = NULL;
455                 int nb64 = 0;
456                 lcb_base64_encode_iov((lcb_IOV *)iov, niov, iov[0].iov_len + iov[1].iov_len, &b64, &nb64);
457                 lcb_log(LOGARGS(ctx, TRACE), CTX_LOGFMT "pkt,snd: size=%d, %.*s", CTX_LOGID(ctx), nb64, nb64, b64);
458                 free(b64);
459             }
460 #endif
461         }
462     }
463 
464     if (ctx->wwant) {
465         ctx->wwant = 0;
466         ctx->procs.cb_flush_ready(ctx);
467     }
468 
469     if (ctx->rdwant && sd->is_reading == 0) {
470         lcb_IOV iov[RWINL_IOVSIZE];
471         unsigned ii;
472         unsigned niov = rdb_rdstart(&ctx->ior, (nb_IOV *)iov, RWINL_IOVSIZE);
473 
474         lcb_assert(niov);
475         for (ii = 0; ii < niov; ++ii) {
476             lcb_assert(iov[ii].iov_len);
477         }
478 
479         rv = IOT_V1(io).read2(IOT_ARG(io), sd, iov, niov, ctx, Cr_handler);
480         if (rv) {
481             send_io_error(ctx, LCBIO_IOERR);
482         } else {
483             sd->is_reading = 1;
484             ctx->npending++;
485         }
486     }
487 }
488 
489 static void
E_schedule(lcbio_CTX * ctx)490 E_schedule(lcbio_CTX *ctx)
491 {
492     lcbio_TABLE *io = ctx->io;
493     short which = 0;
494 
495     if (ctx->rdwant) {
496         which |= LCB_READ_EVENT;
497     }
498     if (ctx->wwant || (ctx->output && ctx->output->rb.nbytes)) {
499         which |= LCB_WRITE_EVENT;
500     }
501 
502     if (!which) {
503         deactivate_watcher(ctx);
504         return;
505     }
506 
507     IOT_V0EV(io).watch(IOT_ARG(io), CTX_FD(ctx), ctx->event, which, ctx, E_handler);
508     ctx->evactive = 1;
509 }
510 
511 void
lcbio_ctx_schedule(lcbio_CTX * ctx)512 lcbio_ctx_schedule(lcbio_CTX *ctx)
513 {
514     if (ctx->entered || ctx->err || ctx->state != ES_ACTIVE) {
515         /* don't schedule events on i/o errors or on entered state */
516         return;
517     }
518     if (IOT_IS_EVENT(ctx->io)) {
519         E_schedule(ctx);
520     } else {
521         C_schedule(ctx);
522     }
523 }
524 
525 /** Extended function used for write-on-callback mode */
526 static int
E_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)527 E_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
528 {
529     lcb_ssize_t nw;
530     lcbio_TABLE *iot = ctx->io;
531     lcb_socket_t fd = CTX_FD(ctx);
532 
533     GT_WRITE_AGAIN:
534     nw = IOT_V0IO(iot).sendv(IOT_ARG(iot), fd, iov,
535         niov <= RWINL_IOVSIZE ? niov : RWINL_IOVSIZE);
536     if (nw > 0) {
537         CTX_INCR_METRIC(ctx, bytes_sent, nw);
538         ctx->procs.cb_flush_done(ctx, nb, nw);
539         return 1;
540 
541     } else if (nw == -1) {
542         switch (IOT_ERRNO(iot)) {
543         case EINTR:
544             /* jump back to retry */
545             goto GT_WRITE_AGAIN;
546 
547         case C_EAGAIN:
548         case EWOULDBLOCK:
549             nw = 0;
550             /* indicate zero bytes were written, but don't send an error */
551             goto GT_WRITE0;
552         default:
553             /* pretend all the bytes were written and deliver an error during
554              * the next event loop iteration. */
555             nw = nb;
556             send_io_error(ctx, LCBIO_IOERR);
557             goto GT_WRITE0;
558         }
559     } else {
560         /* connection closed. pretend everything was written and send an error */
561         nw = nb;
562         send_io_error(ctx, LCBIO_SHUTDOWN);
563         goto GT_WRITE0;
564     }
565 
566     GT_WRITE0:
567     ctx->procs.cb_flush_done(ctx, nb, nw);
568     return 0;
569 }
570 
571 static void
Cw_ex_handler(lcb_sockdata_t * sd,int status,void * wdata)572 Cw_ex_handler(lcb_sockdata_t *sd, int status, void *wdata)
573 {
574     lcbio_CTX *ctx = ((lcbio_SOCKET *)sd->lcbconn)->ctx;
575     unsigned nflushed = (uintptr_t)wdata;
576     ctx->npending--;
577 
578     CTX_INCR_METRIC(ctx, bytes_sent, nflushed);
579     ctx->entered = 1;
580     ctx->procs.cb_flush_done(ctx, nflushed, nflushed);
581     ctx->entered = 0;
582 
583     if (ctx->state == ES_ACTIVE && status) {
584         CTX_INCR_METRIC(ctx, io_error, 1);
585         invoke_entered_errcb(ctx, convert_lcberr(ctx, LCBIO_IOERR));
586     }
587     if (ctx->state != ES_ACTIVE && !ctx->npending) {
588         free_ctx(ctx);
589     }
590 }
591 
592 static int
C_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)593 C_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
594 {
595     lcbio_TABLE *iot = ctx->io;
596     lcb_sockdata_t *sd = CTX_SD(ctx);
597     int status = IOT_V1(iot).write2(IOT_ARG(iot),
598         sd, iov, niov, (void *)(uintptr_t)nb, Cw_ex_handler);
599     if (status) {
600         /** error! */
601         lcbio_OSERR saverr = IOT_ERRNO(iot);
602         ctx->procs.cb_flush_done(ctx, nb, nb);
603         lcbio_ctx_senderr(ctx, lcbio_mklcberr(saverr, ctx->sock->settings));
604         return 0;
605     } else {
606         ctx->npending++;
607         return 1;
608     }
609 }
610 
611 int
lcbio_ctx_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)612 lcbio_ctx_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
613 {
614     lcbio_TABLE *iot = ctx->io;
615     if (IOT_IS_EVENT(iot)) {
616         return E_put_ex(ctx, iov, niov, nb);
617     } else {
618         return C_put_ex(ctx, iov, niov, nb);
619     }
620 }
621 
622 void
lcbio_ctx_wwant(lcbio_CTX * ctx)623 lcbio_ctx_wwant(lcbio_CTX *ctx)
624 {
625     if ((IOT_IS_EVENT(ctx->io)) == 0 && ctx->entered == 0) {
626         ctx->procs.cb_flush_ready(ctx);
627     } else {
628         ctx->wwant = 1;
629     }
630 }
631 
632 void
lcbio_ctx_senderr(lcbio_CTX * ctx,lcb_error_t err)633 lcbio_ctx_senderr(lcbio_CTX *ctx, lcb_error_t err)
634 {
635     if (ctx->err == LCB_SUCCESS) {
636         ctx->err = err;
637     }
638     deactivate_watcher(ctx);
639     lcbio_async_signal(ctx->as_err);
640 }
641 
642 void
lcbio_ctx_dump(lcbio_CTX * ctx,FILE * fp)643 lcbio_ctx_dump(lcbio_CTX *ctx, FILE *fp)
644 {
645     fprintf(fp, "IOCTX=%p. SUBSYS=%s\n", (void*)ctx, ctx->subsys);
646     fprintf(fp, "  Pending=%d\n", ctx->npending);
647     fprintf(fp, "  ReqRead=%d\n", ctx->rdwant);
648     fprintf(fp, "  WantWrite=%d\n", ctx->wwant);
649     fprintf(fp, "  Entered=%d\n", ctx->entered);
650     fprintf(fp, "  Active=%d\n", ctx->state == ES_ACTIVE);
651     fprintf(fp, "  SOCKET=%p\n", (void*)ctx->sock);
652     fprintf(fp, "    Model=%s\n", ctx->io->model == LCB_IOMODEL_EVENT ? "Event" : "Completion");
653     if (IOT_IS_EVENT(ctx->io)) {
654         fprintf(fp, "    FD=%d\n", ctx->sock->u.fd);
655         fprintf(fp, "    Watcher Active=%d\n", ctx->evactive);
656     } else {
657         fprintf(fp, "    SD=%p\n", (void *)ctx->sock->u.sd);
658         fprintf(fp, "    Reading=%d\n", ctx->sock->u.sd->is_reading);
659     }
660     fprintf(fp, "    WILL DUMP IOR/READBUF INFO:\n");
661     rdb_dump(&ctx->ior, fp);
662 }
663