1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Lock-free queue for communication between threads.
23 *
24 * Currently only a many-to-one version has been,
25 * implemented, i.e., many threads can enqueue but
26 * only one thread can dequeue at a time. It doesn't
27 * have to be the same thread dequeuing every time, but
28 * synchronization so that only one thread dequeues
29 * at a time has to be provided by other means.
30 *
31 * When/If the need for a many-to-many queue arises,
32 * this implementation can relatively easy be extended
33 * to support that too.
34 *
35 * Usage instructions can be found in erts_thr_queue.c
36 *
37 * Author: Rickard Green
38 */
39
40 #ifndef ERL_THR_QUEUE_H__
41 #define ERL_THR_QUEUE_H__
42
43 #include "sys.h"
44 #include "erl_threads.h"
45 #include "erl_alloc.h"
46 #include "erl_thr_progress.h"
47
48 typedef enum {
49 ERTS_THR_Q_LIVE_UNDEF,
50 ERTS_THR_Q_LIVE_SHORT,
51 ERTS_THR_Q_LIVE_LONG
52 } ErtsThrQLive_t;
53
54 #define ERTS_THR_Q_INIT_DEFAULT \
55 { \
56 { \
57 ERTS_THR_Q_LIVE_UNDEF, \
58 ERTS_THR_Q_LIVE_SHORT \
59 }, \
60 NULL, \
61 NULL, \
62 1 \
63 }
64
65 typedef struct ErtsThrQ_t_ ErtsThrQ_t;
66
67 typedef struct {
68 struct {
69 ErtsThrQLive_t queue;
70 ErtsThrQLive_t objects;
71 } live;
72 void *arg;
73 void (*notify)(void *);
74 int auto_finalize_dequeue;
75 } ErtsThrQInit_t;
76
77 typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
78 typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
79
80 struct ErtsThrQElement_t_ {
81 erts_atomic_t next;
82 union {
83 erts_atomic_t atmc;
84 void *ptr;
85 } data;
86 };
87
88 typedef struct {
89 ErtsThrQElement_t *start;
90 ErtsThrQElement_t *end;
91 } ErtsThrQFinDeQ_t;
92
93 typedef enum {
94 ERTS_THR_Q_CLEAN,
95 ERTS_THR_Q_NEED_THR_PRGR,
96 ERTS_THR_Q_DIRTY,
97 } ErtsThrQCleanState_t;
98
99
100 typedef struct {
101 ErtsThrQElement_t marker;
102 erts_atomic_t last;
103 erts_atomic_t um_refc[2];
104 erts_atomic32_t um_refc_ix;
105 ErtsThrQLive_t live;
106 erts_atomic32_t thr_prgr_clean_scheduled;
107 void *arg;
108 void (*notify)(void *);
109 } ErtsThrQTail_t;
110
111 struct ErtsThrQ_t_ {
112 /*
113 * This structure needs to be cache line aligned for best
114 * performance.
115 */
116 union {
117 /* Modified by threads enqueuing */
118 ErtsThrQTail_t data;
119 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))];
120 } tail;
121 /*
122 * Everything below this point is *only* accessed by the
123 * thread dequeuing.
124 */
125 struct {
126 erts_atomic_t head;
127 ErtsThrQLive_t live;
128 ErtsThrQElement_t *first;
129 ErtsThrQElement_t *unref_end;
130 int clean_reached_head_count;
131 struct {
132 int automatic;
133 ErtsThrQElement_t *start;
134 ErtsThrQElement_t *end;
135 } deq_fini;
136 struct {
137 ErtsThrPrgrVal thr_progress;
138 int thr_progress_reached;
139 int um_refc_ix;
140 ErtsThrQElement_t *unref_end;
141 } next;
142 int used_marker;
143 void *arg;
144 void (*notify)(void *);
145 } head;
146 struct {
147 int finalizing;
148 ErtsThrQLive_t live;
149 void *blk;
150 } q;
151 };
152
153
154 void erts_thr_q_init(void);
155 void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *);
156 ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *);
157 ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *);
158 ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *);
159 ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *);
160 ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int);
161 ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *);
162 void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *);
163 void erts_thr_q_enqueue(ErtsThrQ_t *, void *);
164 void * erts_thr_q_dequeue(ErtsThrQ_t *);
165 int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *,
166 ErtsThrQFinDeQ_t *);
167 void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
168 ErtsThrQFinDeQ_t *);
169 int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
170 void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
171
172 #ifdef USE_LTTNG_VM_TRACEPOINTS
173 int erts_thr_q_length_dirty(ErtsThrQ_t *);
174 #endif
175
176 ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
177
178 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
179
180 ERTS_GLB_INLINE ErtsThrPrgrVal
erts_thr_q_need_thr_progress(ErtsThrQ_t * q)181 erts_thr_q_need_thr_progress(ErtsThrQ_t *q)
182 {
183 return q->head.next.thr_progress;
184 }
185
186 #endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
187
188 #endif /* ERL_THR_QUEUE_H__ */
189