1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2014 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "ctx.h"
19 #include "iotable.h"
20 #include "timer-ng.h"
21 #include "ioutils.h"
22 #include <stdio.h>
23 #include <lcbio/ssl.h>
24
25 #define CTX_FD(ctx) (ctx)->fd
26 #define CTX_SD(ctx) (ctx)->sd
27 #define CTX_IOT(ctx) (ctx)->io
28 #define CTX_INCR_METRIC(ctx, metric, n) do { \
29 if (ctx->sock && ctx->sock->metrics) { \
30 ctx->sock->metrics->metric += n; \
31 } \
32 } while (0)
33
34
35 #define LOGARGS(c, lvl) (c)->sock->settings, "ioctx", LCB_LOG_##lvl, __FILE__, __LINE__
36
37 #include "rw-inl.h"
38
39 typedef enum {
40 ES_ACTIVE = 0,
41 ES_DETACHED
42 } easy_state;
43
44 static void
err_handler(void * cookie)45 err_handler(void *cookie)
46 {
47 lcbio_CTX *ctx = (void *)cookie;
48 ctx->procs.cb_err(ctx, ctx->err);
49 }
50
51 static lcb_error_t
convert_lcberr(const lcbio_CTX * ctx,lcbio_IOSTATUS status)52 convert_lcberr(const lcbio_CTX *ctx, lcbio_IOSTATUS status)
53 {
54 const lcb_settings *settings = ctx->sock->settings;
55 lcbio_OSERR oserr = IOT_ERRNO(ctx->sock->io);
56
57 if (lcbio_ssl_check(ctx->sock)) {
58 lcb_error_t err = lcbio_ssl_get_error(ctx->sock);
59 if (err) {
60 return err;
61 }
62 }
63
64 if (status == LCBIO_SHUTDOWN) {
65 return lcbio_mklcberr(0, settings);
66 } else if (oserr != 0) {
67 return lcbio_mklcberr(oserr, settings);
68 } else {
69 return LCB_NETWORK_ERROR;
70 }
71 }
72
73 lcbio_CTX *
lcbio_ctx_new(lcbio_SOCKET * sock,void * data,const lcbio_CTXPROCS * procs)74 lcbio_ctx_new(lcbio_SOCKET *sock, void *data, const lcbio_CTXPROCS *procs)
75 {
76 lcbio_CTX *ctx = calloc(1, sizeof(*ctx));
77 ctx->sock = sock;
78 sock->ctx = ctx;
79 ctx->io = sock->io;
80 ctx->data = data;
81 ctx->procs = *procs;
82 ctx->state = ES_ACTIVE;
83 ctx->as_err = lcbio_timer_new(ctx->io, ctx, err_handler);
84 ctx->subsys = "unknown";
85 sock->service = LCBIO_SERVICE_UNSPEC;
86 sock->atime = LCB_NS2US(gethrtime());
87
88 rdb_init(&ctx->ior, sock->settings->allocator_factory());
89 lcbio_ref(sock);
90
91 if (IOT_IS_EVENT(ctx->io)) {
92 ctx->event = IOT_V0EV(ctx->io).create(IOT_ARG(ctx->io));
93 ctx->fd = sock->u.fd;
94 } else {
95 ctx->sd = sock->u.sd;
96 }
97
98 ctx->procs = *procs;
99 ctx->state = ES_ACTIVE;
100
101 lcb_log(LOGARGS(ctx, DEBUG), CTX_LOGFMT "Pairing with SOCK=%016" PRIx64, CTX_LOGID(ctx), sock->id);
102 return ctx;
103 }
104
105 static void
free_ctx(lcbio_CTX * ctx)106 free_ctx(lcbio_CTX *ctx)
107 {
108 rdb_cleanup(&ctx->ior);
109 lcbio_unref(ctx->sock);
110 if (ctx->output) {
111 ringbuffer_destruct(&ctx->output->rb);
112 free(ctx->output);
113 }
114 if (ctx->procs.cb_flush_ready) {
115 /* dtor */
116 ctx->procs.cb_flush_ready(ctx);
117 }
118 free(ctx);
119 }
120
121 static void
deactivate_watcher(lcbio_CTX * ctx)122 deactivate_watcher(lcbio_CTX *ctx)
123 {
124 if (ctx->evactive && ctx->event) {
125 IOT_V0EV(CTX_IOT(ctx)).cancel(
126 IOT_ARG(CTX_IOT(ctx)), CTX_FD(ctx), ctx->event);
127 ctx->evactive = 0;
128 }
129 }
130
131 void
lcbio_ctx_close_ex(lcbio_CTX * ctx,lcbio_CTXCLOSE_cb cb,void * arg,lcbio_CTXDTOR_cb dtor,void * dtor_arg)132 lcbio_ctx_close_ex(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *arg,
133 lcbio_CTXDTOR_cb dtor, void *dtor_arg)
134 {
135 unsigned oldrc;
136 ctx->state = ES_DETACHED;
137 lcb_assert(ctx->sock);
138
139 if (ctx->event) {
140 deactivate_watcher(ctx);
141 IOT_V0EV(CTX_IOT(ctx)).destroy(IOT_ARG(CTX_IOT(ctx)), ctx->event);
142 ctx->event = NULL;
143 }
144
145 if (ctx->as_err) {
146 lcbio_timer_destroy(ctx->as_err);
147 ctx->as_err = NULL;
148 }
149
150 oldrc = ctx->sock->refcount;
151 lcb_log(LOGARGS(ctx, DEBUG), CTX_LOGFMT "Destroying context. Pending Writes=%d, Entered=%s, Socket Refcount=%d", CTX_LOGID(ctx), (int)ctx->npending, (int)ctx->entered ? "true": "false", oldrc);
152
153 if (cb) {
154 int reusable =
155 ctx->npending == 0 && /* no pending events */
156 ctx->err == LCB_SUCCESS && /* no socket errors */
157 ctx->rdwant == 0 && /* no expected input */
158 ctx->wwant == 0 && /* no expected output */
159 (ctx->output == NULL || ctx->output->rb.nbytes == 0);
160 cb(ctx->sock, reusable, arg);
161 }
162
163 ctx->sock->ctx = NULL;
164 if (oldrc == ctx->sock->refcount) {
165 lcbio_unref(ctx->sock);
166 }
167
168 if (ctx->output) {
169 ringbuffer_destruct(&ctx->output->rb);
170 free(ctx->output);
171 ctx->output = NULL;
172 }
173
174 ctx->fd = INVALID_SOCKET;
175 ctx->sd = NULL;
176
177 if (dtor) {
178 ctx->data = dtor_arg;
179 ctx->procs.cb_flush_ready = dtor;
180
181 } else {
182 ctx->procs.cb_flush_ready = NULL;
183 }
184 ctx->procs.cb_read = NULL;
185
186 if (ctx->npending == 0 && ctx->entered == 0) {
187 free_ctx(ctx);
188 }
189 }
190
191 void
lcbio_ctx_close(lcbio_CTX * ctx,lcbio_CTXCLOSE_cb cb,void * arg)192 lcbio_ctx_close(lcbio_CTX *ctx, lcbio_CTXCLOSE_cb cb, void *arg)
193 {
194 lcbio_ctx_close_ex(ctx, cb, arg, NULL, NULL);
195 }
196
197 void
lcbio_ctx_put(lcbio_CTX * ctx,const void * buf,unsigned nbuf)198 lcbio_ctx_put(lcbio_CTX *ctx, const void *buf, unsigned nbuf)
199 {
200 lcbio__EASYRB *erb = ctx->output;
201
202 if (!erb) {
203 ctx->output = erb = calloc(1, sizeof(*ctx->output));
204
205 if (!erb) {
206 lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
207 return;
208 }
209
210 erb->parent = ctx;
211
212 if (!ringbuffer_initialize(&erb->rb, nbuf)) {
213 lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
214 return;
215 }
216 }
217
218 if (!ringbuffer_ensure_capacity(&erb->rb, nbuf)) {
219 lcbio_ctx_senderr(ctx, LCB_CLIENT_ENOMEM);
220 return;
221 }
222
223 ringbuffer_write(&erb->rb, buf, nbuf);
224 }
225
226 void
lcbio_ctx_rwant(lcbio_CTX * ctx,unsigned n)227 lcbio_ctx_rwant(lcbio_CTX *ctx, unsigned n)
228 {
229 ctx->rdwant = n;
230 }
231
232 static void
set_iterbuf(lcbio_CTX * ctx,lcbio_CTXRDITER * iter)233 set_iterbuf(lcbio_CTX *ctx, lcbio_CTXRDITER *iter)
234 {
235 if ((iter->nbuf = rdb_get_contigsize(&ctx->ior))) {
236 if (iter->nbuf > iter->remaining) {
237 iter->nbuf = iter->remaining;
238 }
239 iter->buf = rdb_get_consolidated(&ctx->ior, iter->nbuf);
240 } else {
241 iter->buf = NULL;
242 }
243 }
244
245 void
lcbio_ctx_ristart(lcbio_CTX * ctx,lcbio_CTXRDITER * iter,unsigned nb)246 lcbio_ctx_ristart(lcbio_CTX *ctx, lcbio_CTXRDITER *iter, unsigned nb)
247 {
248 iter->remaining = nb;
249 set_iterbuf(ctx, iter);
250 }
251
252 void
lcbio_ctx_rinext(lcbio_CTX * ctx,lcbio_CTXRDITER * iter)253 lcbio_ctx_rinext(lcbio_CTX *ctx, lcbio_CTXRDITER *iter)
254 {
255 rdb_consumed(&ctx->ior, iter->nbuf);
256 iter->remaining -= iter->nbuf;
257 set_iterbuf(ctx, iter);
258 }
259
260 static int
E_free_detached(lcbio_CTX * ctx)261 E_free_detached(lcbio_CTX *ctx)
262 {
263 if (ctx->state == ES_DETACHED) {
264 free_ctx(ctx);
265 return 1;
266 }
267 return 0;
268 }
269
270 static void
invoke_read_cb(lcbio_CTX * ctx,unsigned nb)271 invoke_read_cb(lcbio_CTX *ctx, unsigned nb)
272 {
273 ctx->rdwant = 0;
274 if (ctx->procs.cb_read == NULL) {
275 return;
276 }
277 ctx->entered++;
278 ctx->procs.cb_read(ctx, nb);
279 ctx->entered--;
280 }
281
282 static void
send_io_error(lcbio_CTX * ctx,lcbio_IOSTATUS status)283 send_io_error(lcbio_CTX *ctx, lcbio_IOSTATUS status)
284 {
285 lcb_error_t rc = convert_lcberr(ctx, status);
286 CTX_INCR_METRIC(ctx, io_error, 1);
287 if (status == LCBIO_SHUTDOWN) {
288 CTX_INCR_METRIC(ctx, io_close, 1);
289 }
290 lcbio_ctx_senderr(ctx, rc);
291 }
292
293 static void
E_handler(lcb_socket_t sock,short which,void * arg)294 E_handler(lcb_socket_t sock, short which, void *arg)
295 {
296 lcbio_CTX *ctx = arg;
297 lcbio_IOSTATUS status;
298 (void)sock;
299
300 if (which & LCB_READ_EVENT) {
301 unsigned nb;
302 status = lcbio_E_rdb_slurp(ctx, &ctx->ior);
303 nb = rdb_get_nused(&ctx->ior);
304
305 ctx->sock->atime = LCB_NS2US(gethrtime());
306 if (nb >= ctx->rdwant) {
307 invoke_read_cb(ctx, nb);
308 if (E_free_detached(ctx)) {
309 return;
310 }
311 }
312 if (!LCBIO_IS_OK(status)) {
313 send_io_error(ctx, status);
314 return;
315 }
316 CTX_INCR_METRIC(ctx, bytes_received, nb);
317 }
318
319 if (which & LCB_WRITE_EVENT) {
320 if (ctx->wwant) {
321 ctx->wwant = 0;
322 ctx->procs.cb_flush_ready(ctx);
323 if (ctx->err) {
324 return;
325 }
326 } else if (ctx->output) {
327 status = lcbio_E_rb_write(ctx, &ctx->output->rb);
328 /** Metrics are logged by E_rb_write */
329 if (!LCBIO_IS_OK(status)) {
330 send_io_error(ctx, status);
331 return;
332 }
333 }
334 }
335
336 lcbio_ctx_schedule(ctx);
337 }
338
339 static void
invoke_entered_errcb(lcbio_CTX * ctx,lcb_error_t err)340 invoke_entered_errcb(lcbio_CTX *ctx, lcb_error_t err)
341 {
342 ctx->err = err;
343 ctx->entered++;
344 ctx->procs.cb_err(ctx, err);
345 ctx->entered--;
346 }
347
348 static void
Cw_handler(lcb_sockdata_t * sd,int status,void * arg)349 Cw_handler(lcb_sockdata_t *sd, int status, void *arg)
350 {
351 lcbio__EASYRB *erb = arg;
352 lcbio_CTX *ctx = erb->parent;
353 (void)sd;
354
355 ctx->npending--;
356 CTX_INCR_METRIC(ctx, bytes_sent, erb->rb.nbytes);
357
358 if (!ctx->output) {
359 ctx->output = erb;
360 ringbuffer_reset(&erb->rb);
361
362 } else {
363 ringbuffer_destruct(&erb->rb);
364 free(erb);
365 }
366
367 if (ctx->state == ES_ACTIVE && status) {
368 invoke_entered_errcb(ctx, convert_lcberr(ctx, LCBIO_IOERR));
369 }
370
371 if (ctx->state != ES_ACTIVE && ctx->npending == 0) {
372 free_ctx(ctx);
373 }
374 }
375
376 static void C_schedule(lcbio_CTX *ctx);
377
378 static void
Cr_handler(lcb_sockdata_t * sd,lcb_ssize_t nr,void * arg)379 Cr_handler(lcb_sockdata_t *sd, lcb_ssize_t nr, void *arg)
380 {
381 lcbio_CTX *ctx = arg;
382 sd->is_reading = 0;
383 ctx->npending--;
384
385 if (ctx->state == ES_ACTIVE) {
386 ctx->sock->atime = LCB_NS2US(gethrtime());
387 if (nr > 0) {
388 unsigned total;
389 rdb_rdend(&ctx->ior, nr);
390 total = rdb_get_nused(&ctx->ior);
391 if (total >= ctx->rdwant) {
392 #ifdef LCB_DUMP_PACKETS
393 {
394 char *b64 = NULL;
395 lcb_SIZE nb64 = 0;
396 char *buf = calloc(total, sizeof(char));
397 rdb_copyread(&ctx->ior, buf, total);
398 lcb_base64_encode2(buf, total, &b64, &nb64);
399 lcb_log(LOGARGS(ctx, TRACE), CTX_LOGFMT "pkt,rcv: size=%d, %.*s", CTX_LOGID(ctx), (int)nb64, (int)nb64, b64);
400 free(b64);
401 free(buf);
402 }
403 #endif
404 invoke_read_cb(ctx, total);
405 }
406 CTX_INCR_METRIC(ctx, bytes_received, total);
407 lcbio_ctx_schedule(ctx);
408 } else {
409 lcbio_IOSTATUS iostatus;
410 lcb_error_t err;
411
412 CTX_INCR_METRIC(ctx, io_error, 1);
413 if (nr) {
414 iostatus = LCBIO_IOERR;
415 } else {
416 iostatus = LCBIO_SHUTDOWN;
417 CTX_INCR_METRIC(ctx, io_close, 1);
418 }
419
420 err = convert_lcberr(ctx, iostatus);
421 ctx->rdwant = 0;
422 invoke_entered_errcb(ctx, err);
423 }
424 }
425
426 if (ctx->state != ES_ACTIVE && ctx->npending == 0) {
427 free_ctx(ctx);
428 }
429 }
430
431 static void
C_schedule(lcbio_CTX * ctx)432 C_schedule(lcbio_CTX *ctx)
433 {
434 lcbio_TABLE *io = ctx->io;
435 lcb_sockdata_t *sd = CTX_SD(ctx);
436 int rv;
437
438 if (ctx->output && ctx->output->rb.nbytes) {
439 /** Schedule a write */
440 lcb_IOV iov[2] = {0};
441 unsigned niov;
442
443 ringbuffer_get_iov(&ctx->output->rb, RINGBUFFER_READ, iov);
444 niov = iov[1].iov_len ? 2 : 1;
445 rv = IOT_V1(io).write2(IOT_ARG(io), sd, iov, niov, ctx->output, Cw_handler);
446 if (rv) {
447 send_io_error(ctx, LCBIO_IOERR);
448 return;
449 } else {
450 ctx->output = NULL;
451 ctx->npending++;
452 #ifdef LCB_DUMP_PACKETS
453 {
454 char *b64 = NULL;
455 int nb64 = 0;
456 lcb_base64_encode_iov((lcb_IOV *)iov, niov, iov[0].iov_len + iov[1].iov_len, &b64, &nb64);
457 lcb_log(LOGARGS(ctx, TRACE), CTX_LOGFMT "pkt,snd: size=%d, %.*s", CTX_LOGID(ctx), nb64, nb64, b64);
458 free(b64);
459 }
460 #endif
461 }
462 }
463
464 if (ctx->wwant) {
465 ctx->wwant = 0;
466 ctx->procs.cb_flush_ready(ctx);
467 }
468
469 if (ctx->rdwant && sd->is_reading == 0) {
470 lcb_IOV iov[RWINL_IOVSIZE];
471 unsigned ii;
472 unsigned niov = rdb_rdstart(&ctx->ior, (nb_IOV *)iov, RWINL_IOVSIZE);
473
474 lcb_assert(niov);
475 for (ii = 0; ii < niov; ++ii) {
476 lcb_assert(iov[ii].iov_len);
477 }
478
479 rv = IOT_V1(io).read2(IOT_ARG(io), sd, iov, niov, ctx, Cr_handler);
480 if (rv) {
481 send_io_error(ctx, LCBIO_IOERR);
482 } else {
483 sd->is_reading = 1;
484 ctx->npending++;
485 }
486 }
487 }
488
489 static void
E_schedule(lcbio_CTX * ctx)490 E_schedule(lcbio_CTX *ctx)
491 {
492 lcbio_TABLE *io = ctx->io;
493 short which = 0;
494
495 if (ctx->rdwant) {
496 which |= LCB_READ_EVENT;
497 }
498 if (ctx->wwant || (ctx->output && ctx->output->rb.nbytes)) {
499 which |= LCB_WRITE_EVENT;
500 }
501
502 if (!which) {
503 deactivate_watcher(ctx);
504 return;
505 }
506
507 IOT_V0EV(io).watch(IOT_ARG(io), CTX_FD(ctx), ctx->event, which, ctx, E_handler);
508 ctx->evactive = 1;
509 }
510
511 void
lcbio_ctx_schedule(lcbio_CTX * ctx)512 lcbio_ctx_schedule(lcbio_CTX *ctx)
513 {
514 if (ctx->entered || ctx->err || ctx->state != ES_ACTIVE) {
515 /* don't schedule events on i/o errors or on entered state */
516 return;
517 }
518 if (IOT_IS_EVENT(ctx->io)) {
519 E_schedule(ctx);
520 } else {
521 C_schedule(ctx);
522 }
523 }
524
525 /** Extended function used for write-on-callback mode */
526 static int
E_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)527 E_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
528 {
529 lcb_ssize_t nw;
530 lcbio_TABLE *iot = ctx->io;
531 lcb_socket_t fd = CTX_FD(ctx);
532
533 GT_WRITE_AGAIN:
534 nw = IOT_V0IO(iot).sendv(IOT_ARG(iot), fd, iov,
535 niov <= RWINL_IOVSIZE ? niov : RWINL_IOVSIZE);
536 if (nw > 0) {
537 CTX_INCR_METRIC(ctx, bytes_sent, nw);
538 ctx->procs.cb_flush_done(ctx, nb, nw);
539 return 1;
540
541 } else if (nw == -1) {
542 switch (IOT_ERRNO(iot)) {
543 case EINTR:
544 /* jump back to retry */
545 goto GT_WRITE_AGAIN;
546
547 case C_EAGAIN:
548 case EWOULDBLOCK:
549 nw = 0;
550 /* indicate zero bytes were written, but don't send an error */
551 goto GT_WRITE0;
552 default:
553 /* pretend all the bytes were written and deliver an error during
554 * the next event loop iteration. */
555 nw = nb;
556 send_io_error(ctx, LCBIO_IOERR);
557 goto GT_WRITE0;
558 }
559 } else {
560 /* connection closed. pretend everything was written and send an error */
561 nw = nb;
562 send_io_error(ctx, LCBIO_SHUTDOWN);
563 goto GT_WRITE0;
564 }
565
566 GT_WRITE0:
567 ctx->procs.cb_flush_done(ctx, nb, nw);
568 return 0;
569 }
570
571 static void
Cw_ex_handler(lcb_sockdata_t * sd,int status,void * wdata)572 Cw_ex_handler(lcb_sockdata_t *sd, int status, void *wdata)
573 {
574 lcbio_CTX *ctx = ((lcbio_SOCKET *)sd->lcbconn)->ctx;
575 unsigned nflushed = (uintptr_t)wdata;
576 ctx->npending--;
577
578 CTX_INCR_METRIC(ctx, bytes_sent, nflushed);
579 ctx->entered = 1;
580 ctx->procs.cb_flush_done(ctx, nflushed, nflushed);
581 ctx->entered = 0;
582
583 if (ctx->state == ES_ACTIVE && status) {
584 CTX_INCR_METRIC(ctx, io_error, 1);
585 invoke_entered_errcb(ctx, convert_lcberr(ctx, LCBIO_IOERR));
586 }
587 if (ctx->state != ES_ACTIVE && !ctx->npending) {
588 free_ctx(ctx);
589 }
590 }
591
592 static int
C_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)593 C_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
594 {
595 lcbio_TABLE *iot = ctx->io;
596 lcb_sockdata_t *sd = CTX_SD(ctx);
597 int status = IOT_V1(iot).write2(IOT_ARG(iot),
598 sd, iov, niov, (void *)(uintptr_t)nb, Cw_ex_handler);
599 if (status) {
600 /** error! */
601 lcbio_OSERR saverr = IOT_ERRNO(iot);
602 ctx->procs.cb_flush_done(ctx, nb, nb);
603 lcbio_ctx_senderr(ctx, lcbio_mklcberr(saverr, ctx->sock->settings));
604 return 0;
605 } else {
606 ctx->npending++;
607 return 1;
608 }
609 }
610
611 int
lcbio_ctx_put_ex(lcbio_CTX * ctx,lcb_IOV * iov,unsigned niov,unsigned nb)612 lcbio_ctx_put_ex(lcbio_CTX *ctx, lcb_IOV *iov, unsigned niov, unsigned nb)
613 {
614 lcbio_TABLE *iot = ctx->io;
615 if (IOT_IS_EVENT(iot)) {
616 return E_put_ex(ctx, iov, niov, nb);
617 } else {
618 return C_put_ex(ctx, iov, niov, nb);
619 }
620 }
621
622 void
lcbio_ctx_wwant(lcbio_CTX * ctx)623 lcbio_ctx_wwant(lcbio_CTX *ctx)
624 {
625 if ((IOT_IS_EVENT(ctx->io)) == 0 && ctx->entered == 0) {
626 ctx->procs.cb_flush_ready(ctx);
627 } else {
628 ctx->wwant = 1;
629 }
630 }
631
632 void
lcbio_ctx_senderr(lcbio_CTX * ctx,lcb_error_t err)633 lcbio_ctx_senderr(lcbio_CTX *ctx, lcb_error_t err)
634 {
635 if (ctx->err == LCB_SUCCESS) {
636 ctx->err = err;
637 }
638 deactivate_watcher(ctx);
639 lcbio_async_signal(ctx->as_err);
640 }
641
642 void
lcbio_ctx_dump(lcbio_CTX * ctx,FILE * fp)643 lcbio_ctx_dump(lcbio_CTX *ctx, FILE *fp)
644 {
645 fprintf(fp, "IOCTX=%p. SUBSYS=%s\n", (void*)ctx, ctx->subsys);
646 fprintf(fp, " Pending=%d\n", ctx->npending);
647 fprintf(fp, " ReqRead=%d\n", ctx->rdwant);
648 fprintf(fp, " WantWrite=%d\n", ctx->wwant);
649 fprintf(fp, " Entered=%d\n", ctx->entered);
650 fprintf(fp, " Active=%d\n", ctx->state == ES_ACTIVE);
651 fprintf(fp, " SOCKET=%p\n", (void*)ctx->sock);
652 fprintf(fp, " Model=%s\n", ctx->io->model == LCB_IOMODEL_EVENT ? "Event" : "Completion");
653 if (IOT_IS_EVENT(ctx->io)) {
654 fprintf(fp, " FD=%d\n", ctx->sock->u.fd);
655 fprintf(fp, " Watcher Active=%d\n", ctx->evactive);
656 } else {
657 fprintf(fp, " SD=%p\n", (void *)ctx->sock->u.sd);
658 fprintf(fp, " Reading=%d\n", ctx->sock->u.sd->is_reading);
659 }
660 fprintf(fp, " WILL DUMP IOR/READBUF INFO:\n");
661 rdb_dump(&ctx->ior, fp);
662 }
663