xref: /openbsd/usr.sbin/smtpd/runq.c (revision fc61954a)
1 /*	$OpenBSD: runq.c,v 1.2 2015/01/20 17:37:54 deraadt Exp $	*/
2 
3 /*
4  * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #include <sys/queue.h>
22 #include <sys/tree.h>
23 #include <sys/uio.h>
24 
25 #include <imsg.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <limits.h>
29 #include <time.h>
30 
31 #include "smtpd.h"
32 
33 struct job {
34 	TAILQ_ENTRY(job)	 entry;
35 	time_t			 when;
36 	void			(*cb)(struct runq *, void *);
37 	void			*arg;
38 };
39 
40 struct runq {
41 	TAILQ_HEAD(, job)	 jobs;
42 	void			(*cb)(struct runq *, void *);
43 	struct event		 ev;
44 };
45 
46 static void runq_timeout(int, short, void *);
47 
48 static struct runq *active;
49 
50 static void
51 runq_reset(struct runq *runq)
52 {
53 	struct timeval	 tv;
54 	struct job	*job;
55 	time_t		 now;
56 
57 	job = TAILQ_FIRST(&runq->jobs);
58 	if (job == NULL)
59 		return;
60 
61 	now = time(NULL);
62 	if (job->when <= now)
63 		tv.tv_sec = 0;
64 	else
65 		tv.tv_sec = job->when - now;
66 	tv.tv_usec = 0;
67 	evtimer_add(&runq->ev, &tv);
68 }
69 
70 static void
71 runq_timeout(int fd, short ev, void *arg)
72 {
73 	struct runq	*runq = arg;
74 	struct job	*job;
75 	time_t		 now;
76 
77 	active = runq;
78 	now = time(NULL);
79 
80 	while((job = TAILQ_FIRST(&runq->jobs))) {
81 		if (job->when > now)
82 			break;
83 		TAILQ_REMOVE(&runq->jobs, job, entry);
84 		if (job->cb)
85 			job->cb(runq, job->arg);
86 		else
87 			runq->cb(runq, job->arg);
88 		free(job);
89 	}
90 
91 	active = NULL;
92 	runq_reset(runq);
93 }
94 
95 int
96 runq_init(struct runq **runqp, void (*cb)(struct runq *, void *))
97 {
98 	struct runq	*runq;
99 
100 	runq = malloc(sizeof(*runq));
101 	if (runq == NULL)
102 		return (0);
103 
104 	runq->cb = cb;
105 	TAILQ_INIT(&runq->jobs);
106 	evtimer_set(&runq->ev, runq_timeout, runq);
107 
108 	*runqp = runq;
109 
110 	return (1);
111 }
112 
113 int
114 runq_schedule(struct runq *runq, time_t when, void (*cb)(struct runq *, void *),
115     void *arg)
116 {
117 	struct job	*job, *tmpjob;
118 
119 	job = malloc(sizeof(*job));
120 	if (job == NULL)
121 		return (0);
122 
123 	job->arg = arg;
124 	job->cb = cb;
125 	job->when = when;
126 
127 	TAILQ_FOREACH(tmpjob, &runq->jobs, entry) {
128 		if (tmpjob->when > job->when) {
129 			TAILQ_INSERT_BEFORE(tmpjob, job, entry);
130 			goto done;
131 		}
132 	}
133 	TAILQ_INSERT_TAIL(&runq->jobs, job, entry);
134 
135     done:
136 	if (runq != active && job == TAILQ_FIRST(&runq->jobs)) {
137 		evtimer_del(&runq->ev);
138 		runq_reset(runq);
139 	}
140 	return (1);
141 }
142 
143 int
144 runq_delay(struct runq *runq, unsigned int delay,
145     void (*cb)(struct runq *, void *), void *arg)
146 {
147 	return runq_schedule(runq, time(NULL) + delay, cb, arg);
148 }
149 
150 int
151 runq_cancel(struct runq *runq, void (*cb)(struct runq *, void *), void *arg)
152 {
153 	struct job	*job, *first;
154 
155 	first = TAILQ_FIRST(&runq->jobs);
156 	TAILQ_FOREACH(job, &runq->jobs, entry) {
157 		if (job->cb == cb && job->arg == arg) {
158 			TAILQ_REMOVE(&runq->jobs, job, entry);
159 			free(job);
160 			if (runq != active && job == first) {
161 				evtimer_del(&runq->ev);
162 				runq_reset(runq);
163 			}
164 			return (1);
165 		}
166 	}
167 
168 	return (0);
169 }
170 
171 int
172 runq_pending(struct runq *runq, void (*cb)(struct runq *, void *), void *arg,
173     time_t *when)
174 {
175 	struct job	*job;
176 
177 	TAILQ_FOREACH(job, &runq->jobs, entry) {
178 		if (job->cb == cb && job->arg == arg) {
179 			if (when)
180 				*when = job->when;
181 			return (1);
182 		}
183 	}
184 
185 	return (0);
186 }
187 
188 int
189 runq_next(struct runq *runq, void (**cb)(struct runq *, void *), void **arg,
190     time_t *when)
191 {
192 	struct job	*job;
193 
194 	job = TAILQ_FIRST(&runq->jobs);
195 	if (job == NULL)
196 		return (0);
197 	if (cb)
198 		*cb = job->cb;
199 	if (arg)
200 		*arg = job->arg;
201 	if (when)
202 		*when = job->when;
203 
204 	return (1);
205 }
206