1 /*
2  * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku, Tatsuhiko Kubo
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining a copy
5  * of this software and associated documentation files (the "Software"), to
6  * deal in the Software without restriction, including without limitation the
7  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8  * sell copies of the Software, and to permit persons to whom the Software is
9  * furnished to do so, subject to the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included in
12  * all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20  * IN THE SOFTWARE.
21  */
22 #ifndef h2o__multithread_h
23 #define h2o__multithread_h
24 
25 #include <pthread.h>
26 #include "h2o/linklist.h"
27 #include "h2o/socket.h"
28 
29 typedef struct st_h2o_multithread_receiver_t h2o_multithread_receiver_t;
30 typedef struct st_h2o_multithread_queue_t h2o_multithread_queue_t;
31 
32 typedef void (*h2o_multithread_receiver_cb)(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages);
33 
34 struct st_h2o_multithread_receiver_t {
35     h2o_multithread_queue_t *queue;
36     h2o_linklist_t _link;
37     h2o_linklist_t _messages;
38     h2o_multithread_receiver_cb cb;
39 };
40 
41 typedef struct st_h2o_multithread_message_t {
42     h2o_linklist_t link;
43 } h2o_multithread_message_t;
44 
45 typedef struct st_h2o_sem_t {
46     pthread_mutex_t _mutex;
47     pthread_cond_t _cond;
48     ssize_t _cur;
49     ssize_t _capacity;
50 } h2o_sem_t;
51 
52 typedef struct st_h2o_barrier_t {
53     pthread_mutex_t _mutex;
54     pthread_cond_t _cond;
55     size_t _count;
56     size_t _out_of_wait;
57 } h2o_barrier_t;
58 
59 /**
60  * This structure is used to rate-limit the emission of error messages.
61  * When something succeeds, the user calls `h2o_error_reporter_record_success`. When something fails, the user calls
62  * `h2o_error_reporter_record_error`, along with how long the emission of the warning message should be delayed. When the delayed
63  * timer expires, the cusmo callback (registered using `H2O_ERROR_REPORTER_INITIALIZER` macro) is invoked, so that the user can emit
64  * whatever message that's necessary, alongside the number of successes and errors within the delayed period.
65  *
66  * Fields that do not start with `_` can be directly accessed / modified by the `report_errors` callback. In other occasions,
67  * modifications MUST be made through the "record" functions. Fields that start with `_` are private and must not be touched by the
68  * user.
69  */
70 typedef struct st_h2o_error_reporter_t {
71     uint64_t cur_errors;
72     uint64_t prev_successes;
73     uintptr_t data;
74     uint64_t _total_successes;
75     pthread_mutex_t _mutex;
76     h2o_timer_t _timer;
77     void (*_report_errors)(struct st_h2o_error_reporter_t *reporter, uint64_t tocal_succeses, uint64_t cur_successes);
78 } h2o_error_reporter_t;
79 
80 /**
81  * creates a queue that is used for inter-thread communication
82  */
83 h2o_multithread_queue_t *h2o_multithread_create_queue(h2o_loop_t *loop);
84 /**
85  * destroys the queue
86  */
87 void h2o_multithread_destroy_queue(h2o_multithread_queue_t *queue);
88 /**
89  * registers a receiver for specific type of message
90  */
91 void h2o_multithread_register_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver,
92                                        h2o_multithread_receiver_cb cb);
93 /**
94  * unregisters a receiver
95  */
96 void h2o_multithread_unregister_receiver(h2o_multithread_queue_t *queue, h2o_multithread_receiver_t *receiver);
97 /**
98  * sends a message (or set message to NULL to just wake up the receiving thread)
99  */
100 void h2o_multithread_send_message(h2o_multithread_receiver_t *receiver, h2o_multithread_message_t *message);
101 /**
102  * create a thread
103  */
104 void h2o_multithread_create_thread(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg);
105 /**
106  * returns the event loop associated with provided queue
107  */
108 h2o_loop_t *h2o_multithread_get_loop(h2o_multithread_queue_t *);
109 
110 /**
111  * a variant of pthread_once, that does not require you to declare a callback, nor have a global variable
112  */
113 #define H2O_MULTITHREAD_ONCE(block)                                                                                                \
114     do {                                                                                                                           \
115         static volatile int lock = 0;                                                                                              \
116         int lock_loaded = lock;                                                                                                    \
117         __sync_synchronize();                                                                                                      \
118         if (!lock_loaded) {                                                                                                        \
119             static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;                                                              \
120             pthread_mutex_lock(&mutex);                                                                                            \
121             if (!lock) {                                                                                                           \
122                 do {                                                                                                               \
123                     block                                                                                                          \
124                 } while (0);                                                                                                       \
125                 __sync_synchronize();                                                                                              \
126                 lock = 1;                                                                                                          \
127             }                                                                                                                      \
128             pthread_mutex_unlock(&mutex);                                                                                          \
129         }                                                                                                                          \
130     } while (0)
131 
132 void h2o_sem_init(h2o_sem_t *sem, ssize_t capacity);
133 void h2o_sem_destroy(h2o_sem_t *sem);
134 void h2o_sem_wait(h2o_sem_t *sem);
135 void h2o_sem_post(h2o_sem_t *sem);
136 void h2o_sem_set_capacity(h2o_sem_t *sem, ssize_t new_capacity);
137 
138 void h2o_barrier_init(h2o_barrier_t *barrier, size_t count);
139 /**
140  * Waits for all threads to enter the barrier.
141  */
142 void h2o_barrier_wait(h2o_barrier_t *barrier);
143 /**
144  * Waits for all threads to enter the barrier, then returns before the barriers are released. True is returned on one thread, while
145  * while false is returned on other threads. On the thread that returned true, it is possible to run any action that has to be taken
146  * while all threads are within the barrier. All threads then must call `h2o_barrier_wait_post_sync_point`.
147  */
148 int h2o_barrier_wait_pre_sync_point(h2o_barrier_t *barrier);
149 void h2o_barrier_wait_post_sync_point(h2o_barrier_t *barrier);
150 int h2o_barrier_done(h2o_barrier_t *barrier);
151 void h2o_barrier_add(h2o_barrier_t *barrier, size_t delta);
152 void h2o_barrier_dispose(h2o_barrier_t *barrier);
153 
154 void h2o_error_reporter__on_timeout(h2o_timer_t *timer);
155 #define H2O_ERROR_REPORTER_INITIALIZER(s)                                                                                          \
156     ((h2o_error_reporter_t){                                                                                                       \
157         ._mutex = PTHREAD_MUTEX_INITIALIZER, ._timer = {.cb = h2o_error_reporter__on_timeout}, ._report_errors = (s)})
158 static void h2o_error_reporter_record_success(h2o_error_reporter_t *reporter);
159 /**
160  * This function records an error event, sets a delayed timer (if not yet have been set), replaces the value of
161  * `h2o_error_reporter_t::data` with `new_data`, returning the old value.
162  */
163 uintptr_t h2o_error_reporter_record_error(h2o_loop_t *loop, h2o_error_reporter_t *reporter, uint64_t delay_ticks,
164                                           uintptr_t new_data);
165 
166 /* inline functions */
167 
h2o_error_reporter_record_success(h2o_error_reporter_t * reporter)168 inline void h2o_error_reporter_record_success(h2o_error_reporter_t *reporter)
169 {
170     __sync_fetch_and_add(&reporter->_total_successes, 1);
171 }
172 
173 #endif
174