/* * Copyright (C) 2012-2020 all contributors * License: GPL-3.0+ */ #include "cmogstored.h" #include "mgmt.h" #include "digest.h" #include "ioprio.h" static void mgmt_digest_step(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_fd *fmfd = mgmt->forward; enum mog_digest_next next; /* * MOG_PRIO_FSCK means we're likely the _only_ thread handling * MD5, so run it as fast as possible. */ if (mgmt->prio == MOG_PRIO_FSCK) { int ioprio = mog_ioprio_drop(); do { next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd); } while (next == MOG_DIGEST_CONTINUE); if (ioprio != -1) mog_ioprio_restore(ioprio); } else { next = mog_digest_read(&fmfd->as.file.digest, fmfd->fd); } assert(mgmt->wbuf == NULL && "wbuf should be NULL here"); switch (next) { case MOG_DIGEST_CONTINUE: case MOG_DIGEST_YIELD: return; case MOG_DIGEST_EOF: mog_mgmt_fn_digest_emit(mfd); break; case MOG_DIGEST_ERROR: mog_mgmt_fn_digest_err(mfd); } mog_file_close(mgmt->forward); mgmt->prio = MOG_PRIO_NONE; mgmt->forward = NULL; } static enum mog_next mgmt_digest_in_progress(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_file *file; assert(mgmt->forward && mgmt->forward != MOG_IOSTAT && "bad forward"); file = &mgmt->forward->as.file; if (file->ioq && !mog_ioq_ready(file->ioq, mfd)) return MOG_NEXT_IGNORE; mgmt_digest_step(mfd); if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; if (mgmt->wbuf) return MOG_NEXT_WAIT_WR; /* * we can error on the MD5 but continue if we didn't * have a socket error (from wbuf == MOG_WR_ERROR) */ return MOG_NEXT_ACTIVE; } MOG_NOINLINE static void mgmt_close(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; mog_rbuf_reattach_and_null(&mgmt->rbuf); assert((mgmt->wbuf == NULL || mgmt->wbuf == MOG_WR_ERROR) && "would leak mgmt->wbuf on close"); mog_fd_put(mfd); } /* called only if epoll/kevent is out-of-space (see mog_http_drop) */ void mog_mgmt_drop(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; if (mgmt->forward && mgmt->forward != MOG_IOSTAT) mog_file_close(mgmt->forward); mgmt_close(mfd); } void mog_mgmt_writev(struct mog_mgmt *mgmt, struct iovec *iov, int iovcnt) { struct mog_fd *mfd = mog_fd_of(mgmt); assert(mgmt->wbuf == NULL && "tried to write while busy"); mgmt->wbuf = mog_trywritev(mfd->fd, iov, iovcnt); } static enum mog_next mgmt_iostat_forever(struct mog_mgmt *mgmt) { mog_rbuf_reattach_and_null(&mgmt->rbuf); /* no coming back from this */ mog_notify(MOG_NOTIFY_DEVICE_REFRESH); mog_svc_devstats_subscribe(mgmt); return MOG_NEXT_IGNORE; } /* returns true if we can continue queue step, false if not */ static enum mog_next mgmt_wbuf_in_progress(struct mog_mgmt *mgmt) { assert(mgmt->wbuf != MOG_WR_ERROR && "still active after write error"); switch (mog_tryflush(mog_fd_of(mgmt)->fd, &mgmt->wbuf)) { case MOG_WRSTATE_ERR: return MOG_NEXT_CLOSE; case MOG_WRSTATE_DONE: if (mgmt->forward == MOG_IOSTAT) return mgmt_iostat_forever(mgmt); return MOG_NEXT_ACTIVE; case MOG_WRSTATE_BUSY: /* unlikely, we never put anything big in wbuf */ return MOG_NEXT_WAIT_WR; } assert(0 && "compiler bug?"); return MOG_NEXT_CLOSE; } /* stash any pipelined data for the next round */ static void mgmt_defer_rbuf(struct mog_mgmt *mgmt, struct mog_rbuf *rbuf, size_t buf_len) { struct mog_rbuf *old = mgmt->rbuf; size_t defer_bytes = buf_len - mgmt->buf_off; char *src = rbuf->rptr + mgmt->buf_off; assert(mgmt->buf_off >= 0 && "mgmt->buf_off negative"); assert(defer_bytes <= MOG_RBUF_MAX_SIZE && "defer bytes overflow"); if (defer_bytes == 0) { mog_rbuf_reattach_and_null(&mgmt->rbuf); } else if (old) { /* no allocation needed, reuse existing */ assert(old == rbuf && "mgmt->rbuf not reused properly"); memmove(old->rptr, src, defer_bytes); old->rsize = defer_bytes; } else { mgmt->rbuf = mog_rbuf_new(defer_bytes); memcpy(mgmt->rbuf->rptr, src, defer_bytes); mgmt->rbuf->rsize = defer_bytes; } mgmt->buf_off = 0; } static bool mgmt_process_client(struct mog_fd *mfd, struct mog_rbuf *rbuf, char *buf, size_t buf_len) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_dev *dev; struct mog_ioq *ioq; /* we handle non-filesystem-using commands inline in the parser */ if (mgmt->mgmt_method == MOG_MGMT_METHOD_NONE) return true; dev = mog_dev_for(mgmt->svc, mgmt->mog_devid, false); if (dev) { ioq = mgmt->prio == MOG_PRIO_NONE ? &dev->ioq : &dev->fsckq; if (!mog_ioq_ready(ioq, mfd)) { if (!mgmt->rbuf) mgmt->rbuf = mog_rbuf_detach(rbuf); mgmt->rbuf->rsize = buf_len; return false; } } switch (mgmt->mgmt_method) { case MOG_MGMT_METHOD_NONE: assert(0 && "we should never get here: MOG_MGMT_METHOD_NONE"); case MOG_MGMT_METHOD_SIZE: mog_mgmt_fn_size(mgmt, buf); break; case MOG_MGMT_METHOD_DIG: mog_mgmt_fn_digest(mfd, buf); if (dev && mgmt->forward) assert(mgmt->forward->as.file.ioq && "ioq not stashed"); break; } mgmt->mgmt_method = MOG_MGMT_METHOD_NONE; return true; } static enum mog_next mgmt_run(struct mog_fd *mfd, struct mog_rbuf *rbuf, char *buf, size_t buf_len) { struct mog_mgmt *mgmt = &mfd->as.mgmt; if (!mgmt_process_client(mfd, rbuf, buf, buf_len)) return MOG_NEXT_IGNORE; /* in ioq */ if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; if (mgmt->forward == MOG_IOSTAT) return mgmt_iostat_forever(mgmt); /* stash unread portion in a new buffer */ mgmt_defer_rbuf(mgmt, rbuf, buf_len); mog_mgmt_reset_parser(mgmt); assert(mgmt->wbuf != MOG_WR_ERROR); return mgmt->wbuf ? MOG_NEXT_WAIT_WR : MOG_NEXT_ACTIVE; } static char * mgmt_rbuf_grow(struct mog_fd *mfd, struct mog_rbuf **rbuf, size_t buf_len) { struct mog_mgmt *mgmt = &mfd->as.mgmt; TRACE(CMOGSTORED_MGMT_RBUF_GROW(mfd->fd, buf_len)); (*rbuf)->rsize = buf_len; mgmt->rbuf = *rbuf = mog_rbuf_grow(*rbuf); return *rbuf ? (*rbuf)->rptr : NULL; } MOG_NOINLINE static bool mgmt_parse_continue(struct mog_fd *mfd, struct mog_rbuf **rbuf, char **buf, size_t buf_len, off_t *off) { struct mog_mgmt *mgmt = &mfd->as.mgmt; TRACE(CMOGSTORED_MGMT_PARSE_CONTINUE(mfd->fd, buf_len)); assert(mgmt->wbuf == NULL && "tried to write (and failed) with partial req"); if (mgmt->buf_off >= (*rbuf)->rcapa) { *buf = mgmt_rbuf_grow(mfd, rbuf, buf_len); if (!*buf) return false; } *off = mgmt->buf_off; return true; } /* * this is the main event callback and called whenever mgmt * is pulled out of a queue (either idle or active) */ static enum mog_next __mgmt_queue_step(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_rbuf *rbuf; char *buf; ssize_t r; off_t off; size_t buf_len = 0; enum mog_parser_state state; assert(mfd->fd >= 0 && "mgmt fd is invalid"); if (mgmt->wbuf) return mgmt_wbuf_in_progress(mgmt); if (mgmt->forward) return mgmt_digest_in_progress(mfd); /* we may have pipelined data in mgmt->rbuf */ rbuf = mgmt->rbuf ? mgmt->rbuf : mog_rbuf_get(MOG_RBUF_BASE_SIZE); buf = rbuf->rptr; off = mgmt->buf_off; assert(off >= 0 && "offset is negative"); assert(off <= rbuf->rcapa && "offset is too big"); if (mgmt->rbuf) { buf_len = mgmt->rbuf->rsize; if (mog_ioq_unblock(mfd)) return mgmt_run(mfd, rbuf, buf, buf_len); assert(off < rbuf->rcapa && "offset is too big"); if (off == 0) /* request got "pipelined", resuming now */ goto parse; } assert(off < rbuf->rcapa && "offset is too big"); reread: r = read(mfd->fd, buf + off, rbuf->rcapa - off); if (r > 0) { buf_len = r + off; parse: state = mog_mgmt_parse(mgmt, buf, buf_len); if (mgmt->wbuf == MOG_WR_ERROR) return MOG_NEXT_CLOSE; switch (state) { case MOG_PARSER_ERROR: syslog(LOG_ERR, "mgmt parser error"); return MOG_NEXT_CLOSE; case MOG_PARSER_CONTINUE: if (mgmt_parse_continue(mfd, &rbuf, &buf, buf_len, &off)) goto reread; goto too_large; case MOG_PARSER_DONE: return mgmt_run(mfd, rbuf, buf, buf_len); } } else if (r == 0) { /* client shut down */ TRACE(CMOGSTORED_MGMT_CLIENT_CLOSE(mfd->fd, buf_len)); return MOG_NEXT_CLOSE; } else { switch (errno) { case_EAGAIN: if (buf_len > 0) { if (mgmt->rbuf == NULL) mgmt->rbuf = mog_rbuf_detach(rbuf); mgmt->rbuf->rsize = buf_len; } return MOG_NEXT_WAIT_RD; case EINTR: goto reread; case ECONNRESET: case ENOTCONN: /* these errors are too common to log, normally */ TRACE(CMOGSTORED_MGMT_RDERR(mfd->fd, buf_len, errno)); return MOG_NEXT_CLOSE; default: TRACE(CMOGSTORED_MGMT_RDERR(mfd->fd, buf_len, errno)); syslog(LOG_NOTICE, "mgmt client died: %m"); return MOG_NEXT_CLOSE; } } assert(0 && "compiler bug?"); too_large: syslog(LOG_ERR, "mgmt request too large"); return MOG_NEXT_CLOSE; } static enum mog_next mgmt_queue_step(struct mog_fd *mfd) { enum mog_next ret = __mgmt_queue_step(mfd); /* enqueue any pending waiters before we become enqueued ourselves */ mog_ioq_next(NULL); return ret; } /* * this function is called whenever a mgmt client is pulled out of * _any_ queue (listen/idle/active). Our queueing model should be * designed to prevent this function from executing concurrently * for any fd. */ enum mog_next mog_mgmt_queue_step(struct mog_fd *mfd) { enum mog_next rv = mgmt_queue_step(mfd); if (rv == MOG_NEXT_CLOSE) mgmt_close(mfd); return rv; } /* called during graceful shutdown instead of mog_mgmt_queue_step */ void mog_mgmt_quit_step(struct mog_fd *mfd) { struct mog_mgmt *mgmt = &mfd->as.mgmt; struct mog_queue *q = mgmt->svc->queue; /* centralize all queue transitions here: */ switch (mgmt_queue_step(mfd)) { case MOG_NEXT_WAIT_RD: if (mgmt->forward || mgmt->rbuf) { /* something is in progress, do not drop it */ mog_idleq_push(q, mfd, MOG_QEV_RD); return; } /* fall-through */ case MOG_NEXT_IGNORE: /* no new iostat watchers during shutdown */ assert(mgmt->prio == MOG_PRIO_NONE && "bad prio"); /* fall-through */ case MOG_NEXT_CLOSE: mog_nr_active_at_quit--; mgmt_close(mfd); return; case MOG_NEXT_ACTIVE: mog_activeq_push(q, mfd); return; case MOG_NEXT_WAIT_WR: mog_idleq_push(q, mfd, MOG_QEV_WR); return; } } /* stringify the address for tracers */ static MOG_NOINLINE void trace_mgmt_accepted(struct mog_fd *mfd, const char *listen_addr, union mog_sockaddr *msa, socklen_t salen) { #ifdef HAVE_SYSTEMTAP struct mog_packaddr mpa; struct mog_ni ni; mog_nameinfo(&mpa, &ni); TRACE(CMOGSTORED_MGMT_ACCEPTED(mfd->fd, ni.ni_host, ni.ni_serv, listen_addr)); #endif /* !HAVE_SYSTEMTAP */ } /* called immediately after accept(), this initializes the mfd (once) */ void mog_mgmt_post_accept(int fd, struct mog_accept *ac, union mog_sockaddr *msa, socklen_t salen) { struct mog_fd *mfd = mog_fd_init(fd, MOG_FD_TYPE_MGMT); struct mog_mgmt *mgmt = &mfd->as.mgmt; if (TRACE_ENABLED(CMOGSTORED_MGMT_ACCEPTED)) trace_mgmt_accepted(mfd, ac->addrinfo->orig, msa, salen); mog_mgmt_init(mgmt, ac->svc); mog_idleq_add(ac->svc->queue, mfd, MOG_QEV_RD); }