1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description: Lock-free queue for communication between threads.
23  *
24  *              Currently only a many-to-one version has been,
25  *              implemented, i.e., many threads can enqueue but
26  *              only one thread can dequeue at a time. It doesn't
27  *              have to be the same thread dequeuing every time, but
28  *              synchronization so that only one thread dequeues
29  *              at a time has to be provided by other means.
30  *
31  *              When/If the need for a many-to-many queue arises,
32  *              this implementation can relatively easy be extended
33  *              to support that too.
34  *
35  *              Usage instructions can be found in erts_thr_queue.c
36  *
37  * Author: 	Rickard Green
38  */
39 
40 #ifndef ERL_THR_QUEUE_H__
41 #define ERL_THR_QUEUE_H__
42 
43 #include "sys.h"
44 #include "erl_threads.h"
45 #include "erl_alloc.h"
46 #include "erl_thr_progress.h"
47 
48 typedef enum {
49     ERTS_THR_Q_LIVE_UNDEF,
50     ERTS_THR_Q_LIVE_SHORT,
51     ERTS_THR_Q_LIVE_LONG
52 } ErtsThrQLive_t;
53 
54 #define ERTS_THR_Q_INIT_DEFAULT						\
55 {									\
56     {									\
57 	ERTS_THR_Q_LIVE_UNDEF,						\
58 	ERTS_THR_Q_LIVE_SHORT						\
59     },									\
60     NULL,								\
61     NULL,								\
62     1									\
63 }
64 
65 typedef struct ErtsThrQ_t_ ErtsThrQ_t;
66 
67 typedef struct {
68     struct {
69 	ErtsThrQLive_t queue;
70 	ErtsThrQLive_t objects;
71     } live;
72     void *arg;
73     void (*notify)(void *);
74     int auto_finalize_dequeue;
75 } ErtsThrQInit_t;
76 
77 typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
78 typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
79 
80 struct ErtsThrQElement_t_ {
81     erts_atomic_t next;
82     union {
83 	erts_atomic_t atmc;
84 	void *ptr;
85     } data;
86 };
87 
88 typedef struct {
89     ErtsThrQElement_t *start;
90     ErtsThrQElement_t *end;
91 } ErtsThrQFinDeQ_t;
92 
93 typedef enum {
94     ERTS_THR_Q_CLEAN,
95     ERTS_THR_Q_NEED_THR_PRGR,
96     ERTS_THR_Q_DIRTY,
97 } ErtsThrQCleanState_t;
98 
99 
100 typedef struct {
101     ErtsThrQElement_t marker;
102     erts_atomic_t last;
103     erts_atomic_t um_refc[2];
104     erts_atomic32_t um_refc_ix;
105     ErtsThrQLive_t live;
106     erts_atomic32_t thr_prgr_clean_scheduled;
107     void *arg;
108     void (*notify)(void *);
109 } ErtsThrQTail_t;
110 
111 struct ErtsThrQ_t_ {
112     /*
113      * This structure needs to be cache line aligned for best
114      * performance.
115      */
116     union {
117 	/* Modified by threads enqueuing */
118 	ErtsThrQTail_t data;
119 	char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))];
120     } tail;
121     /*
122      * Everything below this point is *only* accessed by the
123      * thread dequeuing.
124      */
125     struct {
126 	erts_atomic_t head;
127 	ErtsThrQLive_t live;
128 	ErtsThrQElement_t *first;
129 	ErtsThrQElement_t *unref_end;
130 	int clean_reached_head_count;
131 	struct {
132 	    int automatic;
133 	    ErtsThrQElement_t *start;
134 	    ErtsThrQElement_t *end;
135 	} deq_fini;
136 	struct {
137 	    ErtsThrPrgrVal thr_progress;
138 	    int thr_progress_reached;
139 	    int um_refc_ix;
140 	    ErtsThrQElement_t *unref_end;
141 	} next;
142 	int used_marker;
143 	void *arg;
144 	void (*notify)(void *);
145     } head;
146     struct {
147 	int finalizing;
148 	ErtsThrQLive_t live;
149 	void *blk;
150     } q;
151 };
152 
153 
154 void erts_thr_q_init(void);
155 void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *);
156 ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *);
157 ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *);
158 ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *);
159 ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *);
160 ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int);
161 ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *);
162 void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *);
163 void erts_thr_q_enqueue(ErtsThrQ_t *, void *);
164 void * erts_thr_q_dequeue(ErtsThrQ_t *);
165 int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *,
166 					 ErtsThrQFinDeQ_t *);
167 void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
168 					     ErtsThrQFinDeQ_t *);
169 int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
170 void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
171 
172 #ifdef USE_LTTNG_VM_TRACEPOINTS
173 int erts_thr_q_length_dirty(ErtsThrQ_t *);
174 #endif
175 
176 ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
177 
178 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
179 
180 ERTS_GLB_INLINE ErtsThrPrgrVal
erts_thr_q_need_thr_progress(ErtsThrQ_t * q)181 erts_thr_q_need_thr_progress(ErtsThrQ_t *q)
182 {
183     return q->head.next.thr_progress;
184 }
185 
186 #endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
187 
188 #endif /* ERL_THR_QUEUE_H__ */
189