1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
6 /*
7 * a poll/select/libev/libevent-based implementation would have a hard time
8 * migrating clients between threads
9 */
10 #ifdef HAVE_KQUEUE
11
mog_queue_new(void)12 struct mog_queue * mog_queue_new(void)
13 {
14 int kqueue_fd = kqueue();
15
16 if (kqueue_fd < 0)
17 die_errno("kqueue() failed");
18
19 return mog_queue_init(kqueue_fd);
20 }
21
22 /*
23 * grabs one active event off the event queue
24 */
mog_idleq_wait(struct mog_queue * q,int timeout)25 struct mog_fd * mog_idleq_wait(struct mog_queue *q, int timeout)
26 {
27 int rc;
28 struct mog_fd *mfd;
29 struct kevent event;
30 struct timespec ts;
31 struct timespec *tsp;
32 bool cancellable = timeout != 0;
33
34 if (timeout < 0) {
35 tsp = NULL;
36 } else {
37 ts.tv_sec = timeout / 1000;
38 ts.tv_nsec = (timeout % 1000) * 1000000;
39 tsp = &ts;
40 }
41
42 /*
43 * we enable SIGURG from pthread_kill() in thrpool.c when sleeping
44 * in kevent(). This allows us to wake up an respond to a
45 * cancellation request (since kevent() is not a cancellation point).
46 */
47 if (cancellable)
48 mog_thr_test_quit();
49
50 rc = kevent(q->queue_fd, NULL, 0, &event, 1, tsp);
51
52 if (rc > 0) {
53 mfd = event.udata;
54 mog_fd_check_out(mfd);
55
56 /* ignore pending cancel until the next round */
57 return mfd;
58 }
59 if (cancellable)
60 mog_thr_test_quit();
61 if (rc == 0)
62 return NULL;
63
64 if (errno != EINTR)
65 die_errno("kevent(wait) failed with (%d)", rc);
66
67 return NULL;
68 }
69
mog_idleq_wait_intr(struct mog_queue * q,int timeout)70 struct mog_fd * mog_idleq_wait_intr(struct mog_queue *q, int timeout)
71 {
72 struct mog_fd *mfd;
73
74 /* this is racy, using a self-pipe covers the race */
75 mog_intr_enable();
76 mfd = mog_idleq_wait(q, timeout);
77 mog_intr_disable();
78 return mfd;
79 }
80
81 MOG_NOINLINE static void
kevent_add_error(struct mog_queue * q,struct mog_fd * mfd)82 kevent_add_error(struct mog_queue *q, struct mog_fd *mfd)
83 {
84 switch (errno) {
85 case ENOMEM:
86 syslog(LOG_ERR,
87 "kevent(EV_ADD) out-of-space, dropping file descriptor");
88 mog_queue_drop(mfd);
89 return;
90 default:
91 syslog(LOG_ERR, "unhandled kevent(EV_ADD) error: %m");
92 assert(0 && "BUG in our usage of kevent");
93 }
94 }
95
add_event(int kqueue_fd,struct kevent * event)96 static int add_event(int kqueue_fd, struct kevent *event)
97 {
98 int rc;
99
100 do {
101 rc = kevent(kqueue_fd, event, 1, NULL, 0, NULL);
102 } while (rc < 0 && errno == EINTR);
103
104 return rc;
105 }
106
qpush(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)107 static void qpush(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
108 {
109 struct kevent event;
110 u_short flags = EV_ADD | EV_ONESHOT;
111
112 EV_SET(&event, mfd->fd, (short)ev, flags, 0, 0, mfd);
113
114 mog_fd_check_in(mfd);
115 if (add_event(q->queue_fd, &event) != 0) {
116 mog_fd_check_out(mfd);
117 kevent_add_error(q, mfd);
118 }
119 }
120
121 /*
122 * Pushes in one mog_fd for kqueue to watch.
123 *
124 * Only call this with MOG_QEV_RW *or* if EAGAIN/EWOULDBLOCK is
125 * encountered in mog_queue_loop.
126 */
mog_idleq_push(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)127 void mog_idleq_push(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
128 {
129 if (ev == MOG_QEV_RW) {
130 switch (mfd->fd_type) {
131 case MOG_FD_TYPE_IOSTAT:
132 case MOG_FD_TYPE_SELFWAKE:
133 ev = MOG_QEV_RD;
134 break;
135 case MOG_FD_TYPE_UNUSED:
136 case MOG_FD_TYPE_ACCEPT:
137 case MOG_FD_TYPE_FILE:
138 case MOG_FD_TYPE_QUEUE:
139 case MOG_FD_TYPE_SVC:
140 assert(0 && "invalid fd_type for mog_idleq_push");
141 default:
142 ev = MOG_QEV_WR;
143 break;
144 }
145 }
146 qpush(q, mfd, ev);
147 }
148
mog_idleq_add(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)149 void mog_idleq_add(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
150 {
151 mog_idleq_push(q, mfd, ev);
152 }
153
154 struct mog_fd *
mog_queue_xchg(struct mog_queue * q,struct mog_fd * mfd,enum mog_qev ev)155 mog_queue_xchg(struct mog_queue *q, struct mog_fd *mfd, enum mog_qev ev)
156 {
157 /*
158 * kqueue() _should_ be able to implement this function with
159 * one syscall, however, we currently rely on mog_idleq_wait()
160 * being a cancellation point. So we must ensure the mfd is
161 * back in the queue (for other threads to access) before
162 * cancelling this thread...
163 */
164 mog_idleq_push(q, mfd, ev);
165
166 return mog_idleq_wait(q, -1);
167 }
168 #else /* ! HAVE_KQUEUE */
169 typedef int avoid_empty_file;
170 #endif /* ! HAVE_KQUEUE */
171