1 /*
2 * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to
6 * deal in the Software without restriction, including without limitation the
7 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8 * sell copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10 *
11 * The above copyright notice and this permission notice shall be included in
12 * all copies or substantial portions of the Software.
13 *
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 * IN THE SOFTWARE.
21 */
22 #ifndef h2o__multithread_h
23 #define h2o__multithread_h
24
25 #include <pthread.h>
26 #include "h2o/linklist.h"
27 #include "h2o/socket.h"
28
29 typedef struct st_h2o_multithread_receiver_t h2o_multithread_receiver_t;
30 typedef struct st_h2o_multithread_queue_t h2o_multithread_queue_t;
31
32 typedef void (*h2o_multithread_receiver_cb)(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
33
34 struct st_h2o_multithread_receiver_t {
35 h2o_multithread_queue_t *queue;
36 h2o_linklist_t _link;
37 h2o_linklist_t _messages;
38 h2o_multithread_receiver_cb cb;
39 };
40
41 typedef struct st_h2o_multithread_message_t {
42 h2o_linklist_t link;
43 } h2o_multithread_message_t;
44
45 typedef struct st_h2o_sem_t {
46 pthread_mutex_t _mutex;
47 pthread_cond_t _cond;
48 ssize_t _cur;
49 ssize_t _capacity;
50 } h2o_sem_t;
51
52 typedef struct st_h2o_barrier_t {
53 pthread_mutex_t _mutex;
54 pthread_cond_t _cond;
55 size_t _count;
56 size_t _out_of_wait;
57 } h2o_barrier_t;
58
59 /**
60 * This structure is used to rate-limit the emission of error messages.
61 * When something succeeds, the user calls `h2o_error_reporter_record_success`. When something fails, the user calls
62 * `h2o_error_reporter_record_error`, along with how long the emission of the warning message should be delayed. When the delayed
63 * timer expires, the cusmo callback (registered using `H2O_ERROR_REPORTER_INITIALIZER` macro) is invoked, so that the user can emit
64 * whatever message that's necessary, alongside the number of successes and errors within the delayed period.
65 *
66 * Fields that do not start with `_` can be directly accessed / modified by the `report_errors` callback. In other occasions,
67 * modifications MUST be made through the "record" functions. Fields that start with `_` are private and must not be touched by the
68 * user.
69 */
70 typedef struct st_h2o_error_reporter_t {
71 uint64_t cur_errors;
72 uint64_t prev_successes;
73 uintptr_t data;
74 uint64_t _total_successes;
75 pthread_mutex_t _mutex;
76 h2o_timer_t _timer;
77 void (*_report_errors)(struct st_h2o_error_reporter_t *reporter, uint64_t tocal_succeses, uint64_t cur_successes);
78 } h2o_error_reporter_t;
79
80 /**
81 * creates a queue that is used for inter-thread communication
82 */
83 h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop);
84 /**
85 * destroys the queue
86 */
87 void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue);
88 /**
89 * registers a receiver for specific type of message
90 */
91 void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver,
92 h2o_multithread_receiver_cb cb);
93 /**
94 * unregisters a receiver
95 */
96 void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver);
97 /**
98 * sends a message (or set message to NULL to just wake up the receiving thread)
99 */
100 void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message);
101 /**
102 * create a thread
103 */
104 void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg);
105 /**
106 * returns the event loop associated with provided queue
107 */
108 h2o_loop_t *h2o_multithread_get_loop(h2o_multithread_queue_t *);
109
110 /**
111 * a variant of pthread_once, that does not require you to declare a callback, nor have a global variable
112 */
113 #define H2O_MULTITHREAD_ONCE(block) \
114 do { \
115 static volatile int lock = 0; \
116 int lock_loaded = lock; \
117 __sync_synchronize(); \
118 if (!lock_loaded) { \
119 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; \
120 pthread_mutex_lock(&mutex); \
121 if (!lock) { \
122 do { \
123 block \
124 } while (0); \
125 __sync_synchronize(); \
126 lock = 1; \
127 } \
128 pthread_mutex_unlock(&mutex); \
129 } \
130 } while (0)
131
132 void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity);
133 void h2o_sem_destroy(h2o_sem_t *sem);
134 void h2o_sem_wait(h2o_sem_t *sem);
135 void h2o_sem_post(h2o_sem_t *sem);
136 void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity);
137
138 void h2o_barrier_init(h2o_barrier_t *barrier, size_t count);
139 /**
140 * Waits for all threads to enter the barrier.
141 */
142 void h2o_barrier_wait(h2o_barrier_t *barrier);
143 /**
144 * Waits for all threads to enter the barrier, then returns before the barriers are released. True is returned on one thread, while
145 * while false is returned on other threads. On the thread that returned true, it is possible to run any action that has to be taken
146 * while all threads are within the barrier. All threads then must call `h2o_barrier_wait_post_sync_point`.
147 */
148 int h2o_barrier_wait_pre_sync_point(h2o_barrier_t *barrier);
149 void h2o_barrier_wait_post_sync_point(h2o_barrier_t *barrier);
150 int h2o_barrier_done(h2o_barrier_t *barrier);
151 void h2o_barrier_add(h2o_barrier_t *barrier, size_t delta);
152 void h2o_barrier_dispose(h2o_barrier_t *barrier);
153
154 void h2o_error_reporter__on_timeout(h2o_timer_t *timer);
155 #define H2O_ERROR_REPORTER_INITIALIZER(s) \
156 ((h2o_error_reporter_t){ \
157 ._mutex = PTHREAD_MUTEX_INITIALIZER, ._timer = {.cb = h2o_error_reporter__on_timeout}, ._report_errors = (s)})
158 static void h2o_error_reporter_record_success(h2o_error_reporter_t *reporter);
159 /**
160 * This function records an error event, sets a delayed timer (if not yet have been set), replaces the value of
161 * `h2o_error_reporter_t::data` with `new_data`, returning the old value.
162 */
163 uintptr_t h2o_error_reporter_record_error(h2o_loop_t *loop, h2o_error_reporter_t *reporter, uint64_t delay_ticks,
164 uintptr_t new_data);
165
166 /* inline functions */
167
h2o_error_reporter_record_success(h2o_error_reporter_t * reporter)168 inline void h2o_error_reporter_record_success(h2o_error_reporter_t *reporter)
169 {
170 __sync_fetch_and_add(&reporter->_total_successes, 1);
171 }
172
173 #endif
174