1 /*
2  * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3  * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4  */
5 #include "cmogstored.h"
6 /*
7  * a poll/select/libev/libevent-based implementation would have a hard time
8  * migrating clients between threads
9  */
10 #ifdef HAVE_KQUEUE
11 
mog_queue_new(void)12 struct mog_queue * mog_queue_new(void)
13 {
14 	int kqueue_fd = kqueue();
15 
16 	if (kqueue_fd < 0)
17 		die_errno("kqueue() failed");
18 
19 	return mog_queue_init(kqueue_fd);
20 }
21 
22 /*
23  * grabs one active event off the event queue
24  */
mog_idleq_wait(struct mog_queue * q,int timeout)25 struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
26 {
27 	int rc;
28 	struct mog_fd *mfd;
29 	struct kevent event;
30 	struct timespec ts;
31 	struct timespec *tsp;
32 	bool cancellable = timeout != 0;
33 
34 	if (timeout < 0) {
35 		tsp = NULL;
36 	} else {
37 		ts.tv_sec = timeout / 1000;
38 		ts.tv_nsec = (timeout % 1000) * 1000000;
39 		tsp = &ts;
40 	}
41 
42 	/*
43 	 * we enable SIGURG from pthread_kill() in thrpool.c when sleeping
44 	 * in kevent().  This allows us to wake up an respond to a
45 	 * cancellation request (since kevent() is not a cancellation point).
46 	 */
47 	if (cancellable)
48 		mog_thr_test_quit();
49 
50 	rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
51 
52 	if (rc > 0) {
53 		mfd = event.udata;
54 		mog_fd_check_out(mfd);
55 
56 		/* ignore pending cancel until the next round */
57 		return mfd;
58 	}
59 	if (cancellable)
60 		mog_thr_test_quit();
61 	if (rc == 0)
62 		return NULL;
63 
64 	if (errno != EINTR)
65 		die_errno("kevent(wait) failed with (%d)", rc);
66 
67 	return NULL;
68 }
69 
mog_idleq_wait_intr(struct mog_queue * q,int timeout)70 struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
71 {
72 	struct mog_fd *mfd;
73 
74 	/* this is racy, using a self-pipe covers the race */
75 	mog_intr_enable();
76 	mfd = mog_idleq_wait(q, timeout);
77 	mog_intr_disable();
78 	return mfd;
79 }
80 
81 MOG_NOINLINE static void
kevent_add_error(struct mog_queue * q,struct mog_fd * mfd)82 kevent_add_error(struct mog_queue *q, struct mog_fd *mfd)
83 {
84 	switch (errno) {
85 	case ENOMEM:
86 		syslog(LOG_ERR,
87 		      "kevent(EV_ADD) out-of-space, dropping file descriptor");
88 		mog_queue_drop(mfd);
89 		return;
90 	default:
91 		syslog(LOG_ERR, "unhandled kevent(EV_ADD) error: %m");
92 		assert(0 && "BUG in our usage of kevent");
93 	}
94 }
95 
add_event(int kqueue_fd,struct kevent * event)96 static int add_event(int kqueue_fd, struct kevent *event)
97 {
98 	int rc;
99 
100 	do {
101 		rc = kevent(kqueue_fd, event, 1, NULL, 0, NULL);
102 	} while (rc < 0 && errno == EINTR);
103 
104 	return rc;
105 }
106 
qpush(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)107 static void qpush(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
108 {
109 	struct kevent event;
110 	u_short flags = EV_ADD | EV_ONESHOT;
111 
112 	EV_SET(&event, mfd->fd, (short)ev, flags, 0, 0, mfd);
113 
114 	mog_fd_check_in(mfd);
115 	if (add_event(q->queue_fd, &event) != 0) {
116 		mog_fd_check_out(mfd);
117 		kevent_add_error(q, mfd);
118 	}
119 }
120 
121 /*
122  * Pushes in one mog_fd for kqueue to watch.
123  *
124  * Only call this with MOG_QEV_RW *or* if EAGAIN/EWOULDBLOCK is
125  * encountered in mog_queue_loop.
126  */
mog_idleq_push(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)127 void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
128 {
129 	if (ev == MOG_QEV_RW) {
130 		switch (mfd->fd_type) {
131 		case MOG_FD_TYPE_IOSTAT:
132 		case MOG_FD_TYPE_SELFWAKE:
133 			ev = MOG_QEV_RD;
134 			break;
135 		case MOG_FD_TYPE_UNUSED:
136 		case MOG_FD_TYPE_ACCEPT:
137 		case MOG_FD_TYPE_FILE:
138 		case MOG_FD_TYPE_QUEUE:
139 		case MOG_FD_TYPE_SVC:
140 			assert(0 && "invalid fd_type for mog_idleq_push");
141 		default:
142 			ev = MOG_QEV_WR;
143 			break;
144 		}
145 	}
146 	qpush(q, mfd, ev);
147 }
148 
mog_idleq_add(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)149 void mog_idleq_add(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
150 {
151 	mog_idleq_push(q, mfd, ev);
152 }
153 
154 struct mog_fd *
mog_queue_xchg(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)155 mog_queue_xchg(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
156 {
157 	/*
158 	 * kqueue() _should_ be able to implement this function with
159 	 * one syscall, however, we currently rely on mog_idleq_wait()
160 	 * being a cancellation point.  So we must ensure the mfd is
161 	 * back in the queue (for other threads to access) before
162 	 * cancelling this thread...
163 	 */
164 	mog_idleq_push(q, mfd, ev);
165 
166 	return mog_idleq_wait(q, -1);
167 }
168 #else /* ! HAVE_KQUEUE */
169 typedef int avoid_empty_file;
170 #endif /* ! HAVE_KQUEUE */
171