1 /*------------------------------------------------------------------------------
2  *
3  * Copyright (c) 2011-2021, EURid vzw. All rights reserved.
4  * The YADIFA TM software product is provided under the BSD 3-clause license:
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  *
10  *        * Redistributions of source code must retain the above copyright
11  *          notice, this list of conditions and the following disclaimer.
12  *        * Redistributions in binary form must reproduce the above copyright
13  *          notice, this list of conditions and the following disclaimer in the
14  *          documentation and/or other materials provided with the distribution.
15  *        * Neither the name of EURid nor the names of its contributors may be
16  *          used to endorse or promote products derived from this software
17  *          without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  *
31  *------------------------------------------------------------------------------
32  *
33  */
34 
35 #ifndef ASYNC_H
36 #define	ASYNC_H
37 
38 #define ASYNC_QUEUE_TYPE_RINGBUFFER 1
39 #define ASYNC_QUEUE_TYPE_DLL        2
40 
41 #define ASYNC_QUEUE_TYPE ASYNC_QUEUE_TYPE_DLL
42 
43 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
44 #include <dnscore/threaded_dll_cw.h>
45 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
46 #include <dnscore/threaded_ringbuffer_cw.h>
47 #else
48 #error "ASYNC_QUEUE_TYPE not set"
49 #endif
50 #include <dnscore/pace.h>
51 
52 #define ASYNC_WAIT_TAG 1
53 
54 #ifdef	__cplusplus
55 extern "C" {
56 #endif
57 
58 /**
59  *
60  * Typically, the handler will be a small function that pushes the message in another processing queue,
61  * with or without changing the id (so processors can be chained)
62  *
63  * The handler can also be a function that allows another function to proceed (delegation mechanism, in the .net linguo)
64  * This would make the handling synchronous but the processing parallel
65  *
66  */
67 
68 #define FREEBSD12_TEST 0
69 
70 // used by threads to process a task then push it further in the assembly line
71 
72 struct async_message_s;
73 
74 typedef void async_done_callback(struct async_message_s *msg);
75 
76 struct async_wait_s
77 {
78     mutex_t mutex;
79     cond_t  cond_wait;
80     volatile s32 wait_count;
81     volatile s32 error_code;
82 #if ASYNC_WAIT_TAG
83     u32 tag;
84 #endif
85 };
86 
87 typedef struct async_wait_s async_wait_s;
88 
89 struct async_queue_s
90 {
91 #if ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_DLL
92     threaded_dll_cw queue;
93 #elif ASYNC_QUEUE_TYPE == ASYNC_QUEUE_TYPE_RINGBUFFER
94     threaded_ringbuffer_cw queue;
95 #else
96     threaded_queue queue;
97 #endif
98 
99     pace_s pace;
100 };
101 
102 typedef struct async_queue_s async_queue_s;
103 
104 struct async_message_s
105 {
106     s32 id;         // the message id for the processor
107     s32 error_code; // the error code to be set by the processor
108     void *args;     // the arguments for the processor
109 
110     async_done_callback *handler; // what must be called when the processor has finished working
111     void *handler_args;             // complementary arguments for the handler
112 
113     volatile s64 start_time;
114 };
115 
116 typedef struct async_message_s async_message_s;
117 
118 /**
119  *
120  * Initialises a synchronisation point
121  * count is the number of releases to do before the async_wait call returns
122  *
123  * @param aw
124  * @param count
125  * @return
126  */
127 
128 void async_wait_init(async_wait_s *aw, s32 count);
129 
130 /**
131  *
132  * Destroys the synchronisation point
133  *
134  * @param aw
135  * @return
136  */
137 
138 void async_wait_finalize(async_wait_s *aw);
139 
140 /**
141  *
142  * Initialises a synchronisation point
143  * count is the number of releases to do before the async_wait call returns
144  *
145  * @param aw
146  * @param count
147  * @return
148  */
149 
150 async_wait_s *async_wait_new_instance(s32 count);
151 void async_wait_destroy(async_wait_s *aw);
152 
153 async_wait_s *async_wait_create_shared(u8 id, s32 count);
154 void async_wait_destroy_shared(async_wait_s *aw);
155 
156 /**
157  * Waits until the count has be reduced to 0 (or below if something bad is going on)
158  *
159  * @param aw
160  * @return
161  */
162 
163 void async_wait(async_wait_s *aw);
164 
165 /**
166  * Waits until the count has be reduced to 0 (or below if something bad is going on)
167  *    OR until the amount of microseconds has elapsed.
168  *
169  * @param aw
170  * @param usec
171  * @return true if and only if the wait counter reached 0
172  */
173 
174 bool async_wait_timeout(async_wait_s *aw, u64 usec);
175 
176 /**
177  * Waits until the count has be reduced to 0 (or below if something bad is going on)
178  *    OR until the epoch in microseconds has been reached.
179  *
180  * @param aw
181  * @param usec
182  * @return true if and only if the wait counter reached 0
183  */
184 
185 bool async_wait_timeout_absolute(async_wait_s *aw, u64 epoch_usec);
186 
187 /**
188  * Returns the current value of the counter
189  *
190  * @param aw
191  * @return
192  */
193 
194 s32 async_wait_get_counter(async_wait_s *aw);
195 
196 /**
197  *
198  * Decreases the count of that amount
199  *
200  * @param aw
201  * @param count
202  * @return
203  */
204 
205 void async_wait_progress(async_wait_s *aw, s32 count);
206 
207 void async_wait_set_first_error(async_wait_s *aw, s32 error);
208 
209 s32 async_wait_get_error(async_wait_s *aw);
210 
211 void async_queue_init(async_queue_s *q, u32 size, u64 min_us, u64 max_us, const char* name);
212 
213 void async_queue_finalize(async_queue_s *q);
214 
215 bool async_queue_empty(async_queue_s *q);
216 
217 u32 async_queue_size(async_queue_s *q);
218 
219 async_message_s *async_message_next(async_queue_s *queue);
220 
221 async_message_s* async_message_try_next(async_queue_s *queue);
222 
223 /**
224  *
225  * Pushes the message to the queue.
226  * The queue is supposed to be read in another thread
227  *
228  * @param queue
229  * @param msg
230  */
231 
232 void async_message_call(async_queue_s *queue, async_message_s *msg);
233 
234 /**
235  * Sets the handler and handler_args fields to make the message a waiter
236  * Restores the fields before returning
237  *
238  * @param msg
239  */
240 
241 int async_message_call_and_wait(async_queue_s *queue, async_message_s *msg);
242 
243 /**
244  * Sets the handler and handler_args fields to ignore the result
245  * (fire and forget)
246  *
247  * @param msg
248  */
249 
250 void async_message_call_and_forget(async_queue_s *queue, async_message_s *msg);
251 
252 /**
253  * Sets the handler and handler_args fields to ignore the result
254  * (fire and forget)
255  *
256  * @param msg
257  */
258 
259 void async_message_call_and_release(async_queue_s *queue, async_message_s *msg);
260 
261 void async_message_pool_init();
262 async_message_s *async_message_alloc();
263 void async_message_release(async_message_s *msg);
264 void async_message_pool_finalize();
265 
266 #ifdef	__cplusplus
267 }
268 #endif
269 
270 #endif	/* ASYNC_H */
271 
272