1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2018-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Process signal queue implementation.
23 *
24 * Currently the following signals are handled:
25 * - Messages
26 * - Exit
27 * - Monitor
28 * - Demonitor
29 * - Monitor down
30 * - Persistent monitor message
31 * - Link
32 * - Unlink
33 * - Unlink Ack
34 * - Group leader
35 * - Is process alive
36 * - Process info request
37 * - Suspend request (monitor of suspend type)
38 * - Resume request (demonitor of suspend type)
39 * - Suspend cleanup (monitor down of suspend type)
40 * - Sync suspend
41 * - RPC request
42 * - Trace change
43 *
44 * The signal queue consists of three parts:
45 * - Outer queue (sig_inq field in process struct)
46 * - Middle queue (sig_qs field in process struct)
47 * - Inner queue (sig_qs field in process struct)
48 *
49 * Incoming signals are placed in the outer queue
50 * by other processes, ports, or by the runtime system
51 * itself. This queue is protected by the msgq process
52 * lock and may be accessed by any other entity. While
53 * a signal is located in the outer queue, it is still
54 * in transit between sender and receiver.
55 *
56 * The middle and the inner queues are private to the
57 * receiving process and can only be accessed while
58 * holding the main process lock. The signal changes
59 * from being in transit to being received while in
60 * the middle queue. Non-message signals are handled
61 * immediately upon reception while message signals
62 * are moved into the inner queue.
63 *
64 * In the outer and middle queues both message signals
65 * and non-message signals are mixed. Signals in these
66 * queues are referenced using two single linked lists.
67 * One single linked list that go through all signals
68 * in the queue and another single linked list that
69 * goes through only non-message signals. The list
70 * through the non-message signals is used for fast
71 * access to these signals in the middle queue, since
72 * these should be handled immediately upon reception.
73 *
74 * The inner queue consists only of one single linked
75 * list through the message signals. A receive
76 * expression can only operate on messages once they
77 * have entered the inner queue.
78 *
79 * Author: Rickard Green
80 */
81
82 #ifndef ERTS_PROC_SIG_QUEUE_H_TYPE__
83 #define ERTS_PROC_SIG_QUEUE_H_TYPE__
84
85 #if 0
86 # define ERTS_PROC_SIG_HARD_DEBUG
87 #endif
88 #if 0
89 # define ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
90 #endif
91 #if 0
92 # define ERTS_PROC_SIG_HARD_DEBUG_RECV_MARKER
93 #endif
94
95 struct erl_mesg;
96 struct erl_dist_external;
97
98 typedef struct {
99 struct erl_mesg *next;
100 union {
101 struct erl_mesg **next;
102 void *attachment;
103 } specific;
104 Eterm tag;
105 } ErtsSignalCommon;
106 /*
107 * Note that not all signal are handled using this functionality!
108 */
109
110 #define ERTS_SIG_Q_OP_MAX 18
111
112 #define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
113 #define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
114 #define ERTS_SIG_Q_OP_MONITOR_DOWN 2
115 #define ERTS_SIG_Q_OP_MONITOR 3
116 #define ERTS_SIG_Q_OP_DEMONITOR 4
117 #define ERTS_SIG_Q_OP_LINK 5
118 #define ERTS_SIG_Q_OP_UNLINK 6
119 #define ERTS_SIG_Q_OP_GROUP_LEADER 7
120 #define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
121 #define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
122 #define ERTS_SIG_Q_OP_IS_ALIVE 10
123 #define ERTS_SIG_Q_OP_PROCESS_INFO 11
124 #define ERTS_SIG_Q_OP_SYNC_SUSPEND 12
125 #define ERTS_SIG_Q_OP_RPC 13
126 #define ERTS_SIG_Q_OP_DIST_SPAWN_REPLY 14
127 #define ERTS_SIG_Q_OP_ALIAS_MSG 15
128 #define ERTS_SIG_Q_OP_RECV_MARK 16
129 #define ERTS_SIG_Q_OP_UNLINK_ACK 17
130 #define ERTS_SIG_Q_OP_ADJ_MSGQ ERTS_SIG_Q_OP_MAX
131
132 #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 10)
133
134 #define ERTS_SIG_Q_TYPE_UNDEFINED \
135 (ERTS_MON_LNK_TYPE_MAX + 1)
136 #define ERTS_SIG_Q_TYPE_DIST_LINK \
137 (ERTS_MON_LNK_TYPE_MAX + 2)
138 #define ERTS_SIG_Q_TYPE_GEN_EXIT \
139 (ERTS_MON_LNK_TYPE_MAX + 3)
140 #define ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR \
141 (ERTS_MON_LNK_TYPE_MAX + 4)
142 #define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
143 (ERTS_MON_LNK_TYPE_MAX + 5)
144 #define ERTS_SIG_Q_TYPE_DIST \
145 (ERTS_MON_LNK_TYPE_MAX + 6)
146 #define ERTS_SIG_Q_TYPE_HEAP \
147 (ERTS_MON_LNK_TYPE_MAX + 7)
148 #define ERTS_SIG_Q_TYPE_OFF_HEAP \
149 (ERTS_MON_LNK_TYPE_MAX + 8)
150 #define ERTS_SIG_Q_TYPE_HEAP_FRAG \
151 (ERTS_MON_LNK_TYPE_MAX + 9)
152 #define ERTS_SIG_Q_TYPE_CLA \
153 ERTS_SIG_Q_TYPE_MAX
154
155 #define ERTS_SIG_IS_DIST_ALIAS_MSG_TAG(Tag) \
156 ((Tag) == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, \
157 ERTS_SIG_Q_TYPE_DIST, \
158 0))
159 #define ERTS_SIG_IS_DIST_ALIAS_MSG(sig) \
160 ERTS_SIG_IS_DIST_ALIAS_MSG_TAG(((ErtsSignal *) (sig))->common.tag)
161
162 #define ERTS_SIG_IS_OFF_HEAP_ALIAS_MSG_TAG(Tag) \
163 ((Tag) == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, \
164 ERTS_SIG_Q_TYPE_OFF_HEAP, \
165 0))
166 #define ERTS_SIG_IS_OFF_HEAP_ALIAS_MSG(sig) \
167 ERTS_SIG_IS_OFF_HEAP_ALIAS_MSG_TAG(((ErtsSignal *) (sig))->common.tag)
168
169 #define ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(Tag) \
170 ((Tag) == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, \
171 ERTS_SIG_Q_TYPE_HEAP, \
172 0))
173 #define ERTS_SIG_IS_HEAP_ALIAS_MSG(sig) \
174 ERTS_SIG_IS_HEAP_ALIAS_MSG_TAG(((ErtsSignal *) (sig))->common.tag)
175
176 #define ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(Tag) \
177 ((Tag) == ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_ALIAS_MSG, \
178 ERTS_SIG_Q_TYPE_HEAP_FRAG, \
179 0))
180 #define ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG(sig) \
181 ERTS_SIG_IS_HEAP_FRAG_ALIAS_MSG_TAG(((ErtsSignal *) (sig))->common.tag)
182
183 #define ERTS_RECV_MARKER_TAG \
184 (ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_RECV_MARK, \
185 ERTS_SIG_Q_TYPE_UNDEFINED, 0))
186 #define ERTS_SIG_IS_RECV_MARKER(Sig) \
187 (((ErtsSignal *) (Sig))->common.tag == ERTS_RECV_MARKER_TAG)
188
189 #define ERTS_RECV_MARKER_PASS_MAX 4
190
191 typedef struct {
192 ErtsSignalCommon common;
193 Eterm from;
194 Uint64 id;
195 } ErtsSigUnlinkOp;
196
197 #define ERTS_SIG_HANDLE_REDS_MAX_PREFERED (CONTEXT_REDS/40)
198
199 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
200 extern Eterm erts_old_recv_marker_id;
201 #endif
202
203 #ifdef ERTS_PROC_SIG_HARD_DEBUG
204 # define ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(P) \
205 ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((P), "")
206 # define ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(P, QL) \
207 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__((P), (QL), "")
208 # define ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__(P, What) \
209 erts_proc_sig_hdbg_check_in_queue((P), (What), __FILE__, __LINE__)
210 # define ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(P, QL, What) \
211 erts_proc_sig_hdbg_check_priv_queue((P), (QL), (What), __FILE__, __LINE__)
212 struct process;
213 void erts_proc_sig_hdbg_check_priv_queue(struct process *c_p, int qlock,
214 char *what, char *file, int line);
215 void erts_proc_sig_hdbg_check_in_queue(struct process *c_p, char *what,
216 char *file, int line);
217 #else
218 # define ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(P)
219 # define ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(P, QL)
220 # define ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__(P, What)
221 #define ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(P, QL, What)
222 #endif
223
224 #ifdef ERTS_PROC_SIG_HARD_DEBUG_RECV_MARKER
225 #define ERTS_HDBG_CHK_RECV_MRKS(P) \
226 erl_proc_sig_hdbg_chk_recv_marker_block((P))
227 struct process;
228 void erl_proc_sig_hdbg_chk_recv_marker_block(struct process *c_p);
229 #else
230 #define ERTS_HDBG_CHK_RECV_MRKS(P)
231 #endif
232
233 #endif
234
235 #if !defined(ERTS_PROC_SIG_QUEUE_H__) && !defined(ERTS_PROC_SIG_QUEUE_TYPE_ONLY)
236 #define ERTS_PROC_SIG_QUEUE_H__
237
238 #include "erl_process.h"
239 #include "erl_bif_unique.h"
240
241 #define ERTS_SIG_Q_OP_BITS 8
242 #define ERTS_SIG_Q_OP_SHIFT 0
243 #define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1)
244
245 #define ERTS_SIG_Q_TYPE_BITS 8
246 #define ERTS_SIG_Q_TYPE_SHIFT ERTS_SIG_Q_OP_BITS
247 #define ERTS_SIG_Q_TYPE_MASK ((1 << ERTS_SIG_Q_TYPE_BITS) - 1)
248
249 #define ERTS_SIG_Q_NON_X_BITS__ (_HEADER_ARITY_OFFS \
250 + ERTS_SIG_Q_OP_BITS \
251 + ERTS_SIG_Q_TYPE_BITS)
252
253 #define ERTS_SIG_Q_XTRA_BITS (32 - ERTS_SIG_Q_NON_X_BITS__)
254 #define ERTS_SIG_Q_XTRA_SHIFT (ERTS_SIG_Q_OP_BITS \
255 + ERTS_SIG_Q_TYPE_BITS)
256 #define ERTS_SIG_Q_XTRA_MASK ((1 << ERTS_SIG_Q_XTRA_BITS) - 1)
257
258
259 #define ERTS_PROC_SIG_OP(Tag) \
260 ((int) (_unchecked_thing_arityval((Tag)) \
261 >> ERTS_SIG_Q_OP_SHIFT) & ERTS_SIG_Q_OP_MASK)
262
263 #define ERTS_PROC_SIG_TYPE(Tag) \
264 ((Uint16) (_unchecked_thing_arityval((Tag)) \
265 >> ERTS_SIG_Q_TYPE_SHIFT) & ERTS_SIG_Q_TYPE_MASK)
266
267 #define ERTS_PROC_SIG_XTRA(Tag) \
268 ((Uint32) (_unchecked_thing_arityval((Tag)) \
269 >> ERTS_SIG_Q_XTRA_SHIFT) & ERTS_SIG_Q_XTRA_MASK)
270
271 #define ERTS_PROC_SIG_MAKE_TAG(Op, Type, Xtra) \
272 (ASSERT(0 <= (Xtra) && (Xtra) <= ERTS_SIG_Q_XTRA_MASK), \
273 _make_header((((Type) & ERTS_SIG_Q_TYPE_MASK) \
274 << ERTS_SIG_Q_TYPE_SHIFT) \
275 | (((Op) & ERTS_SIG_Q_OP_MASK) \
276 << ERTS_SIG_Q_OP_SHIFT) \
277 | (((Xtra) & ERTS_SIG_Q_XTRA_MASK) \
278 << ERTS_SIG_Q_XTRA_SHIFT), \
279 _TAG_HEADER_EXTERNAL_PID))
280
281
282 /*
283 * ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK is not an actual
284 * operation. We keep it at the top of the OP range,
285 * larger than ERTS_SIG_Q_OP_MAX.
286 */
287 #define ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK ERTS_SIG_Q_OP_MASK
288
289 #define ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK \
290 ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK,0,0)
291
292 struct dist_entry_;
293
294 #define ERTS_PROC_HAS_INCOMING_SIGNALS(P) \
295 (!!(erts_atomic32_read_nob(&(P)->state) \
296 & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)))
297
298 /*
299 * Send operations of currently supported process signals follow...
300 */
301
302 /**
303 *
304 * @brief Send an exit signal to a process.
305 *
306 *
307 * @param[in] c_p Pointer to process struct of
308 * currently executing process.
309 *
310 * @param[in] from Identifier of sender.
311 *
312 * @param[in] to Identifier of local process
313 * to send signal to.
314 *
315 * @param[in] reason Exit reason.
316 *
317 * @param[in] token Seq trace token.
318 *
319 * @param[in] normal_kills If non-zero, also normal exit
320 * reason will kill the receiver
321 * if it is not trapping exit.
322 *
323 */
324 void
325 erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
326 Eterm reason, Eterm token, int normal_kills);
327
328 /**
329 *
330 * @brief Send an exit signal to a process.
331 *
332 * This function is used instead of erts_proc_sig_send_link_exit()
333 * when the signal arrives via the distribution and
334 * therefore no link structure is available.
335 *
336 * @param[in] dep Distribution entry of channel
337 * that the signal arrived on.
338 *
339 * @param[in] from Identifier of sender.
340 *
341 * @param[in] to Identifier of receiver.
342 *
343 * @param[in] dist_ext The exit reason in external term format
344 *
345 * @param[in] hfrag Heap frag with trace token and dist_ext
346 * iff available, otherwise NULL.
347 *
348 * @param[in] reason Exit reason.
349 *
350 * @param[in] token Seq trace token.
351 *
352 */
353 void
354 erts_proc_sig_send_dist_exit(DistEntry *dep,
355 Eterm from, Eterm to,
356 ErtsDistExternal *dist_ext,
357 ErlHeapFragment *hfrag,
358 Eterm reason, Eterm token);
359
360 /**
361 *
362 * @brief Send an exit signal due to broken link to a process.
363 *
364 *
365 * @param[in] c_p Pointer to process struct of
366 * currently executing process.
367 *
368 * @param[in] from Identifier of sender.
369 *
370 * @param[in] lnk Pointer to link structure
371 * from the sending side. It
372 * should contain information
373 * about receiver.
374 *
375 * @param[in] reason Exit reason.
376 *
377 * @param[in] token Seq trace token.
378 *
379 */
380 void
381 erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
382 Eterm reason, Eterm token);
383
384 /**
385 *
386 * @brief Send an link signal to a process.
387 *
388 *
389 * @param[in] c_p Pointer to process struct of
390 * currently executing process.
391 *
392 * @param[in] to Identifier of receiver.
393 *
394 * @param[in] lnk Pointer to link structure to
395 * insert on receiver side.
396 *
397 * @return A non-zero value if
398 * signal was successfully
399 * sent. If a zero, value
400 * the signal was not sent
401 * due to the receiver not
402 * existing. The sender
403 * needs to deallocate the
404 * link structure.
405 *
406 */
407 int
408 erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk);
409
410 /**
411 *
412 * @brief Create a new unlink identifier
413 *
414 * The newly created unlink identifier is to be used in an
415 * unlink operation.
416 *
417 * @param[in] c_p Pointer to process struct of
418 * currently executing process.
419 *
420 * @return A new 64-bit unlink identifier
421 * unique in context of the
422 * calling process. The identifier
423 * may be any value but zero.
424 */
425 ERTS_GLB_INLINE Uint64 erts_proc_sig_new_unlink_id(Process *c_p);
426
427 /**
428 *
429 * @brief Create an unlink op signal structure
430 *
431 * The structure will contain a newly created unlink
432 * identifier to be used in the operation.
433 *
434 * @param[in] c_p Pointer to process struct of
435 * currently executing process
436 * ('from' is a process
437 * identifier), or NULL if not
438 * called in the context of an
439 * executing process ('from' is
440 * a port identifier).
441 *
442 * @param[in] from Id (as an erlang term) of
443 * entity sending the unlink
444 * signal.
445 *
446 * @return A pointer to the unlink op
447 * structure.
448 */
449 ErtsSigUnlinkOp *
450 erts_proc_sig_make_unlink_op(Process *c_p, Eterm from);
451
452 /**
453 *
454 * @brief Destroy an unlink op signal structure
455 *
456 * @param[in] sulnk A pointer to the unlink op
457 * structure.
458 */
459 void
460 erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk);
461
462 /**
463 *
464 * @brief Send an unlink signal to a process.
465 *
466 *
467 * @param[in] c_p Pointer to process struct of
468 * currently executing process.
469 *
470 * @param[in] from Id (as an erlang term) of
471 * entity sending the unlink
472 * signal.
473 *
474 * @param[in] lnk Pointer to link structure from
475 * the sending side. It should
476 * contain information about
477 * receiver.
478 */
479 Uint64
480 erts_proc_sig_send_unlink(Process *c_p, Eterm from, ErtsLink *lnk);
481
482 /**
483 *
484 * @brief Send an unlink acknowledgment signal to a process.
485 *
486 *
487 * @param[in] c_p Pointer to process struct of
488 * currently executing process.
489 *
490 * @param[in] from Id (as an erlang term) of
491 * entity sending the unlink
492 * signal.
493 *
494 * @param[in] sulnk A pointer to the unlink op
495 * structure. This structure
496 * was typically received by
497 * the caller in an unlink
498 * signal.
499 */
500 void
501 erts_proc_sig_send_unlink_ack(Process *c_p, Eterm from,
502 ErtsSigUnlinkOp *sulnk);
503
504 /**
505 *
506 * @brief Send an exit signal due to broken link to a process.
507 *
508 * This function is used instead of erts_proc_sig_send_link_exit()
509 * when the signal arrives via the distribution and
510 * therefore no link structure is available.
511 *
512 * @param[in] dep Distribution entry of channel
513 * that the signal arrived on.
514 *
515 * @param[in] from Identifier of sender.
516 *
517 * @param[in] to Identifier of receiver.
518 *
519 * @param[in] dist_ext The exit reason in external term format
520 *
521 * @param[in] hfrag Heap frag with trace token and dist_ext
522 * iff available, otherwise NULL.
523 *
524 * @param[in] reason Exit reason.
525 *
526 * @param[in] token Seq trace token.
527 *
528 */
529 void
530 erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
531 Eterm from, Eterm to,
532 ErtsDistExternal *dist_ext,
533 ErlHeapFragment *hfrag,
534 Eterm reason, Eterm token);
535
536 /**
537 *
538 * @brief Send an unlink signal to a local process.
539 *
540 * This function is used instead of erts_proc_sig_send_unlink()
541 * when the signal arrives via the distribution.
542 *
543 * @param[in] dep Distribution entry of channel
544 * that the signal arrived on.
545 *
546 * @param[in] from Identifier of sender.
547 *
548 * @param[in] to Identifier of receiver.
549 *
550 * @param[in] id Identifier of unlink operation.
551 */
552 void
553 erts_proc_sig_send_dist_unlink(DistEntry *dep, Uint32 conn_id,
554 Eterm from, Eterm to, Uint64 id);
555
556 /**
557 *
558 * @brief Send an unlink acknowledgment signal to a local process.
559 *
560 * This function is used instead of erts_proc_sig_send_unlink_ack()
561 * when the signal arrives via the distribution.
562 *
563 * @param[in] c_p Pointer to process struct of
564 * currently executing process or
565 * NULL if not called in the context
566 * of an executing process.
567 *
568 * @param[in] dep Distribution entry of channel
569 * that the signal arrived on.
570 *
571 * @param[in] from Identifier of sender.
572 *
573 * @param[in] to Identifier of receiver.
574 *
575 * @param[in] id Identifier of unlink operation.
576 */
577 void
578 erts_proc_sig_send_dist_unlink_ack(Process *c_p, DistEntry *dep,
579 Uint32 conn_id, Eterm from, Eterm to,
580 Uint64 id);
581
582 /**
583 *
584 * @brief Send a monitor down signal to a process.
585 *
586 * @param[in] mon Pointer to target monitor
587 * structure from the sending
588 * side. It should contain
589 * information about receiver.
590 *
591 * @param[in] reason Exit reason.
592 *
593 */
594 void
595 erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason);
596
597 /**
598 *
599 * @brief Send a demonitor signal to a process.
600 *
601 * @param[in] mon Pointer to origin monitor
602 * structure from the sending
603 * side. It should contain
604 * information about receiver.
605 *
606 * @param[in] reason Exit reason.
607 *
608 */
609 void
610 erts_proc_sig_send_demonitor(ErtsMonitor *mon);
611
612 /**
613 *
614 * @brief Send a monitor signal to a process.
615 *
616 * @param[in] mon Pointer to target monitor
617 * structure to insert on
618 * receiver side.
619 *
620 * @param[in] to Identifier of receiver.
621 *
622 * @return A non-zero value if
623 * signal was successfully
624 * sent. If a zero, value
625 * the signal was not sent
626 * due to the receiver not
627 * existing. The sender
628 * needs to deallocate the
629 * monitor structure.
630 *
631 */
632 int
633 erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
634
635 /**
636 *
637 * @brief Send a monitor down signal to a process.
638 *
639 * This function is used instead of erts_proc_sig_send_monitor_down()
640 * when the signal arrives via the distribution and
641 * therefore no monitor structure is available.
642 *
643 * @param[in] dep Pointer to distribution entry
644 * of channel that the signal
645 * arrived on.
646 *
647 * @param[in] ref Reference identifying the monitor.
648 *
649 * @param[in] from Identifier of sender.
650 *
651 * @param[in] to Identifier of receiver.
652 *
653 * @param[in] dist_ext The exit reason in external term format
654 *
655 * @param[in] hfrag Heap frag with trace token and dist_ext
656 * iff available, otherwise NULL.
657 *
658 * @param[in] reason Exit reason.
659 *
660 */
661 void
662 erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
663 Eterm from, Eterm to,
664 ErtsDistExternal *dist_ext,
665 ErlHeapFragment *hfrag,
666 Eterm reason);
667
668 /**
669 *
670 * @brief Send a demonitor signal to a process.
671 *
672 * This function is used instead of erts_proc_sig_send_demonitor()
673 * when the signal arrives via the distribution and
674 * no monitor structure is available.
675 *
676 * @param[in] to Identifier of receiver.
677 *
678 * @param[in] ref Reference identifying the monitor.
679 *
680 */
681 void
682 erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref);
683
684 /**
685 *
686 * @brief Send a persistent monitor triggered signal to a process.
687 *
688 * Used by monitors that are not auto disabled such as for
689 * example 'time_offset' monitors.
690 *
691 * @param[in] type Monitor type.
692 *
693 * @param[in] key Monitor key.
694 *
695 * @param[in] from Identifier of sender.
696 *
697 * @param[in] to Identifier of receiver.
698 *
699 * @param[in] msg Message template.
700 *
701 * @param[in] msg_sz Heap size of message template.
702 *
703 */
704 void
705 erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key,
706 Eterm from, Eterm to,
707 Eterm msg, Uint msg_sz);
708
709 /**
710 *
711 * @brief Send a trace change signal to a process.
712 *
713 * @param[in] to Identifier of receiver.
714 *
715 * @param[in] on Trace flags to enable.
716 *
717 * @param[in] off Trace flags to disable.
718 *
719 * @param[in] tracer Tracer to set. If the non-value,
720 * tracer will not be changed.
721 *
722 */
723 void
724 erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off,
725 Eterm tracer);
726
727 /**
728 *
729 * @brief Send a group leader signal to a process.
730 *
731 * Set group-leader of receiving process. If sent locally,
732 * a response message '{Ref, Result}' is sent to the original
733 * sender when performed where Ref is the reference passed
734 * as 'ref' argument, and Result is either 'true' or 'badarg'.
735 *
736 * @param[in] c_p Pointer to process struct of
737 * currently executing process.
738 * NULL if signal arrived via
739 * distribution.
740 *
741 * @param[in] to Identifier of receiver.
742 *
743 * @param[in] gl Identifier of new group leader.
744 *
745 * @param[in] ref Reference to use in response
746 * message to locally sending
747 * process (i.e., c_p when c_p
748 * is non-null).
749 *
750 */
751 void
752 erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl,
753 Eterm ref);
754
755 /**
756 *
757 * @brief Send an 'is process alive' signal to a process.
758 *
759 * A response message '{Ref, Result}' is sent to the
760 * sender when performed where Ref is the reference passed
761 * as 'ref' argument, and Result is either 'true' or 'false'.
762 *
763 * @param[in] c_p Pointer to process struct of
764 * currently executing process.
765 * NULL if signal arrived via
766 * distribution.
767 *
768 * @param[in] to Identifier of receiver.
769 *
770 * @param[in] ref Reference to use in response
771 * message to the sending
772 * process (i.e., c_p).
773 *
774 * @returns A value != 0 if the request
775 * was sent; otherwise, 0. If
776 * the request was not sent the
777 * process was non-existing.
778 */
779 int
780 erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to,
781 Eterm ref);
782
783 /**
784 *
785 * @brief Send a 'process info request' signal to a process.
786 *
787 * A response message '{Ref, Result}' is sent to the
788 * sender when performed where Ref is the reference passed
789 * as 'ref' argument, and Result corresponds to return result
790 * from erlang:process_info/[1,2].
791 *
792 * @param[in] c_p Pointer to process struct of
793 * currently executing process.
794 * NULL if signal arrived via
795 * distribution.
796 *
797 * @param[in] to Identifier of receiver.
798 *
799 * @param[in] item_ix Info index array to pass to
800 * erts_process_info()
801 *
802 * @param[in] len Lenght of info index array
803 *
804 * @param[in] need_msgq_len Non-zero if message queue
805 * length is needed; otherwise,
806 * zero. If non-zero, sig_qs.len
807 * will be set to correspond
808 * to the message queue length
809 * before call to
810 * erts_process_info()
811 *
812 * @param[in] flags Flags to pass to
813 * erts_process_info()
814 *
815 * @param[in] reserve_size Heap size that is known to
816 * be needed. May not be correct
817 * though.
818 *
819 * @param[in] ref Reference to use in response
820 * message to the sending
821 * process (i.e., c_p).
822 *
823 */
824 int
825 erts_proc_sig_send_process_info_request(Process *c_p,
826 Eterm to,
827 int *item_ix,
828 int len,
829 int need_msgq_len,
830 int flags,
831 Uint reserve_size,
832 Eterm ref);
833
834 /**
835 *
836 * @brief Send a 'sync suspend' signal to a process.
837 *
838 * A response message '{Tag, Reply}' is sent to the
839 * sender when performed where Tag is the term passed
840 * as 'tag' argument. Reply is either 'suspended',
841 * 'not_suspended', 'exited' if the operation is
842 * asynchronous; otherwise, the 'reply' argument or
843 * 'badarg' if process terminated.
844 *
845 * This signal does *not* change the suspend state, only
846 * reads and reply the state. This signal is typically
847 * sent after a suspend request (monitor of suspend type)
848 * signal has been sent to the process in order to get a
849 * response when the suspend monitor has been processed.
850 *
851 * @param[in] c_p Pointer to process struct of
852 * currently executing process.
853 *
854 * @param[in] to Identifier of receiver.
855 *
856 * @param[in] tag Tag to use in response
857 * message to the sending
858 * process (i.e., c_p).
859 *
860 * @param[in] reply Reply to send if this
861 * is a synchronous operation;
862 * otherwise, THE_NON_VALUE.
863 */
864 void
865 erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to,
866 Eterm tag, Eterm reply);
867
868 /**
869 *
870 * @brief Send an 'rpc' signal to a process.
871 *
872 * The function 'func' will be executed in the
873 * context of the receiving process. A response
874 * message '{Ref, Result}' is sent to the sender
875 * when 'func' has been called. 'Ref' is the reference
876 * returned by this function and 'Result' is the
877 * term returned by 'func'. If the return value of
878 * 'func' is not an immediate term, 'func' has to
879 * allocate a heap fragment where the result is stored
880 * and update the the heap fragment pointer pointer
881 * passed as third argument to point to it.
882 *
883 * If this function returns a reference, 'func' will
884 * be called in the context of the receiver. However,
885 * note that this might happen when the receiver is in
886 * an exiting state. The caller of this function
887 * *unconditionally* has to enter a receive that match
888 * on the returned reference in all clauses as next
889 * receive; otherwise, bad things will happen!
890 *
891 * If THE_NON_VALUE is returned, the receiver did not
892 * exist. The signal was not sent, and no specific
893 * receive has to be entered by the caller.
894 *
895 * @param[in] c_p Pointer to process struct of
896 * currently executing process.
897 *
898 * @param[in] to Identifier of receiver process.
899 *
900 * @param[in] reply Non-zero if a reply is wanted.
901 *
902 * @param[in] func Function to execute in the
903 * context of the receiver.
904 * First argument will be a
905 * pointer to the process struct
906 * of the receiver process.
907 * Second argument will be 'arg'
908 * (see below). Third argument
909 * will be a pointer to a pointer
910 * to a heap fragment for storage
911 * of result returned from 'func'
912 * (i.e. an 'out' parameter).
913 *
914 * @param[in] arg Void pointer to argument
915 * to pass as second argument
916 * in call of 'func'.
917 *
918 * @returns If the request was sent,
919 * an internal ordinary
920 * reference; otherwise,
921 * THE_NON_VALUE (non-existing
922 * receiver).
923 */
924 Eterm
925 erts_proc_sig_send_rpc_request(Process *c_p,
926 Eterm to,
927 int reply,
928 Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
929 void *arg);
930
931 int
932 erts_proc_sig_send_dist_spawn_reply(Eterm node,
933 Eterm ref,
934 Eterm to,
935 ErtsLink *lnk,
936 Eterm result,
937 Eterm token);
938
939 /**
940 *
941 * @brief Send a 'copy literal area request' signal to
942 * a process.
943 *
944 * The receiver will scan its message queue and then the rest
945 * of the process. After the operation has bee performed it will
946 * reply with a '{copy_literals, ReqID, Res}' message to the
947 * sender where 'Res' equals 'ok' if the receiver is clean or
948 * 'need_gc' if a literal GC is needed.
949 *
950 * Should only be called by the literal-area-collector process!
951 *
952 * @param[in] c_p Pointer to process struct of
953 * currently executing process.
954 *
955 * @param[in] to Identifier of receiver.
956 *
957 * @param[in] req_id Request ID (RegID) term.
958 */
959 void
960 erts_proc_sig_send_cla_request(Process *c_p, Eterm to, Eterm req_id);
961
962
963 /**
964 *
965 * @brief Send a 'move message queue off heap' signal to
966 * a the sending process itself.
967 *
968 * When received, all on heap messages will be moved off heap.
969 *
970 * @param[in] c_p Pointer to process struct of
971 * currently executing process.
972 *
973 * @param[in] to Identifier of receiver.
974 *
975 */
976 void
977 erts_proc_sig_send_move_msgq_off_heap(Process *c_p, Eterm to);
978
979 /*
980 * End of send operations of currently supported process signals.
981 */
982
983
984 /**
985 *
986 * @brief Handle incoming signals.
987 *
988 * Called by an ordinary scheduler in order to handle incoming
989 * signals for a process. The work is done on the middle part
990 * of the signal queue. The maximum amount of signals handled
991 * is limited by the amount of reductions given when calling.
992 * Note that a reduction does not necessarily map to a signal.
993 *
994 * @param[in] c_p Pointer to process struct of
995 * currently executing process.
996 *
997 * @param[out] statep Pointer to process state after
998 * signal handling. May not be NULL.
999 *
1000 * @param[in,out] redsp Pointer to an integer containing
1001 * reductions. On input, the amount
1002 * of preferred reductions to be
1003 * used by the call. On output, the
1004 * amount of reductions consumed.
1005 *
1006 * @param[in] max_reds Absolute maximum of reductions
1007 * to use. If the process cannot
1008 * make progress after the preferred
1009 * amount of reductions has been
1010 * consumed, signal handling may
1011 * proceed up to a maximum of
1012 * 'max_reds' in order to make
1013 * the process able to proceed
1014 * with other tasks after handling
1015 * has finished.
1016 *
1017 * @param[in] local_only If is zero, new signals may be
1018 * fetched from the outer queue and
1019 * put in the middle queue before
1020 * signal handling is performed. If
1021 * non-zero, no new signals will be
1022 * fetched before handling begins.
1023 *
1024 * @return Returns a non-zero value, when
1025 * no more signals to handle in the
1026 * middle queue remain. A zero
1027 * return value means that there
1028 * remains signals in the middle
1029 * queue.
1030 */
1031 int
1032 erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
1033 int *redsp, int max_reds,
1034 int local_only);
1035
1036 /**
1037 *
1038 * @brief Handle remaining signals for an exiting process
1039 *
1040 * Called as part of termination of a process. It will handle
1041 * remaining signals.
1042 *
1043 * @param[in] c_p Pointer to process struct of
1044 * currently executing process.
1045 *
1046 * @param[in,out] redsp Pointer to an integer containing
1047 * reductions. On input, the amount
1048 * of maximum reductions to be
1049 * used by the call. On output, the
1050 * amount of reductions consumed.
1051 *
1052 * @return Returns a non-zero value, when
1053 * no more signals to handle in the
1054 * middle queue remain. A zero
1055 * return value means that there
1056 * remains signals in the middle
1057 * queue.
1058 */
1059 int
1060 erts_proc_sig_handle_exit(Process *c_p, Sint *redsp,
1061 ErtsMonitor **pend_spawn_mon_pp,
1062 Eterm reason);
1063
1064 /**
1065 *
1066 * @brief Helper for loop_rec instruction.
1067 *
1068 * This function should only be called from the loop_rec
1069 * instruction (or equivalents). It is called when loop_rec
1070 * reach the end of the inner queue (which is the only
1071 * part of the signal queue that receive is allowed to
1072 * operate on). When called, this function tries to make
1073 * more messages available in the inner queue. This by
1074 * fetching signals from the outer queue to the middle
1075 * queue and/or processing signals in the middle queue.
1076 *
1077 * @param[in] c_p Pointer to process struct of
1078 * currently executing process.
1079 *
1080 * @param[in] fcalls Content of FCALLS in
1081 * process_main()
1082 *
1083 * @param[in] neg_o_reds Content of neg_o_reds in
1084 * process_main()
1085 *
1086 * @param[out] msgpp Pointer to pointer to next
1087 * available message to process.
1088 * If *msgpp == NULL, no more
1089 * messages are available.
1090 *
1091 * @param[out] get_outp Pointer to an integer
1092 * indicating how to respond
1093 * if no more messages are
1094 * available (msgpp). If integer
1095 * is set to zero, loop_rec
1096 * should jump to an appropriate
1097 * wait instruction. If zero,
1098 * the message queue lock remain
1099 * locked since the test for
1100 * more messages was done.
1101 * If the integer is set to a
1102 * value larger that zero, the
1103 * process exited. If the integer
1104 * is set to a value less than
1105 * zero, the process is required
1106 * to yield.
1107 *
1108 *
1109 * @return The amount of reductions
1110 * consumed.
1111 *
1112 */
1113 int
1114 erts_proc_sig_receive_helper(Process *c_p, int fcalls,
1115 int neg_o_reds, ErtsMessage **msgpp,
1116 int *get_outp);
1117
1118 /**
1119 *
1120 * @brief Fetch signals from the outer queue
1121 *
1122 * Fetches signals from outer queue and places them in the
1123 * middle queue ready for signal handling. If the middle
1124 * queue is empty, only message signals were present in the
1125 * outer queue, and no receive tracing has been enabled on
1126 * the process, the middle queue is bypassed and messages
1127 * are delivered directly to the inner queue instead.
1128 *
1129 * @param[in] c_p Pointer to process struct of
1130 * currently executing process.
1131 * @returns Amount of message signals in
1132 * inner plus middle signal
1133 * queues after fetch completed
1134 * (NOT the message queue
1135 * length).
1136 */
1137 ERTS_GLB_INLINE Sint erts_proc_sig_fetch(Process *p);
1138
1139 /**
1140 *
1141 * @brief Get amount of messages in private queues
1142 *
1143 * @param[in] c_p Pointer to process struct of
1144 * currently executing process.
1145 *
1146 * @returns Amount of message signals in
1147 * inner plus middle signal
1148 * queues after fetch completed
1149 * (NOT the message queue
1150 * length).
1151 */
1152 Sint
1153 erts_proc_sig_privqs_len(Process *c_p);
1154
1155
1156 /**
1157 * @brief Enqueue list of signals on process.
1158 *
1159 * Message queue must be locked on receiving process.
1160 *
1161 * @param rp Receiving process.
1162 * @param first First signal in list.
1163 * @param last Last signal in list.
1164 * @param last_next Pointer to next-pointer to last non-message signal
1165 * or NULL if no non-message signal after 'first'.
1166 * @param msg_cnt Number of message signals in list.
1167 * @param in_state 'state' of rp.
1168 *
1169 * @return 'state' of rp.
1170 */
1171 erts_aint32_t
1172 erts_enqueue_signals(Process *rp, ErtsMessage *first,
1173 ErtsMessage **last, ErtsMessage **last_next,
1174 Uint msg_cnt,
1175 erts_aint32_t in_state);
1176
1177 /**
1178 *
1179 * @brief Flush pending signal.
1180 *
1181 */
1182 void
1183 erts_proc_sig_send_pending(ErtsSchedulerData* esdp);
1184
1185
1186 void
1187 erts_proc_sig_send_to_alias(Process *c_p, Eterm from, Eterm to,
1188 Eterm msg, Eterm token);
1189
1190 void
1191 erts_proc_sig_send_dist_to_alias(Eterm alias, ErtsDistExternal *edep,
1192 ErlHeapFragment *hfrag, Eterm token);
1193
1194 /**
1195 *
1196 * @brief Schedule process to handle enqueued signal(s).
1197 *
1198 * @param rp Receiving process.
1199 * @param state 'state' of rp.
1200 * @param enable_flag Additional state flags to enable, like
1201 * ERTS_PSFLG_ACTIVE if message has been enqueued.
1202 */
1203 ERTS_GLB_INLINE void erts_proc_notify_new_sig(Process* rp, erts_aint32_t state,
1204 erts_aint32_t enable_flag);
1205
1206 void erts_make_dirty_proc_handled(Eterm pid, erts_aint32_t state,
1207 erts_aint32_t prio);
1208
1209
1210 typedef struct {
1211 Uint size;
1212 ErtsMessage *msgp;
1213 } ErtsMessageInfo;
1214
1215 /**
1216 *
1217 * @brief Prepare signal queue for inspection by process_info()
1218 *
1219 *
1220 * @param[in] c_p Pointer to process struct of
1221 * currently executing process.
1222 *
1223 * @param[in] rp Pointer to process struct of
1224 * process to inspect.
1225 *
1226 * @param[in] rp_locks Process locks held on 'rp'.
1227 *
1228 * @param[in] info_on_self Integer set to non-zero value
1229 * if caller is inspecting itself;
1230 * otherwise, zero.
1231 *
1232 * @param[in] mip Pointer to array of
1233 * ErtsMessageInfo structures.
1234 */
1235 Uint erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
1236 Process *rp,
1237 ErtsProcLocks rp_locks,
1238 int info_on_self,
1239 ErtsMessageInfo *mip);
1240
1241 /**
1242 *
1243 * @brief Move message data of messages in private queues to heap
1244 *
1245 * Move message data of messages in private queues to the heap.
1246 * This is part of GC of processes that uses on-heap message
1247 * data.
1248 *
1249 * @param[in] c_p Pointer to process struct of
1250 * currently executing process.
1251 *
1252 */
1253 void erts_proc_sig_move_msgs_to_heap(Process *c_p);
1254
1255 /**
1256 *
1257 * @brief Size of signal in bytes.
1258 *
1259 * @param[in] sig Signal to inspect.
1260 *
1261 */
1262 Uint erts_proc_sig_signal_size(ErtsSignal *sig);
1263
1264
1265 /**
1266 *
1267 * @brief Clear seq trace tokens on all signals
1268 *
1269 * Assumes thread progress has been blocked!
1270 *
1271 * @param[in] c_p Pointer to process
1272 *
1273 */
1274 void
1275 erts_proc_sig_clear_seq_trace_tokens(Process *c_p);
1276
1277 /**
1278 *
1279 * @brief Handle pending suspend requests
1280 *
1281 * Should be called by processes when they stop
1282 * execution on a dirty scheduler if they have
1283 * pending suspend requests (i.e. when
1284 * ERTS_PROC_GET_PENDING_SUSPEND(c_p) != NULL).
1285 *
1286 * @param[in] c_p Pointer to executing
1287 * process
1288 */
1289 void
1290 erts_proc_sig_handle_pending_suspend(Process *c_p);
1291
1292 /**
1293 *
1294 * @brief Decode the reason term in an external signal
1295 *
1296 * Any distributed signal with a payload only has the control
1297 * message decoded by the dist entry. The final decode of the
1298 * payload is done by the process when it inspects the signal
1299 * by calling this function.
1300 *
1301 * This functions handles both messages and link/monitor exits.
1302 *
1303 * Return true if the decode was successful, false otherwise.
1304 *
1305 * @param[in] c_p Pointer to executing process
1306 *
1307 * @param[in] proc_lock Locks held by process. Should always be MAIN.
1308 *
1309 * @param[in] msgp The signal to decode
1310 *
1311 * @param[in] force_off_heap If the term should be forced to be off-heap
1312 */
1313 int
1314 erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
1315 ErtsMessage *msgp, int force_off_heap);
1316
1317 ErtsDistExternal *
1318 erts_proc_sig_get_external(ErtsMessage *msgp);
1319
1320 void
1321 erts_proc_sig_cleanup_non_msg_signal(ErtsMessage *sig);
1322
1323
1324 /**
1325 *
1326 * @brief Create and insert a receive marker at the end of the
1327 * signal queue of the calling process unless the
1328 * signal queue is empty.
1329 *
1330 *
1331 * @param[in] c_p Pointer to process struct of
1332 * currently executing process.
1333 *
1334 * @return A process unique integer
1335 * identifying the unbound
1336 * receive marker, or the atom
1337 * 'undefined' if no marker was
1338 * inserted.
1339 */
1340 ERTS_GLB_INLINE Eterm erts_msgq_recv_marker_insert(Process *c_p);
1341
1342 /**
1343 *
1344 * @brief Bind a previously inserted receive marker to a
1345 * reference.
1346 *
1347 *
1348 * @param[in] c_p Pointer to process struct of
1349 * currently executing process.
1350 *
1351 * @param[in] insert_id Receive marker identifier returned
1352 * by erts_msgq_recv_marker_insert().
1353 *
1354 * @param[in] bind_id An internal reference to bind
1355 * the receive marker to. Other
1356 * terms are allowed, but will
1357 * cause the receive marker
1358 * identified by insert_id to be
1359 * cleared. Note that the special
1360 * literal internal reference
1361 * 'erts_old_recv_marker_id' is
1362 * *not* allowed to be passed here!
1363 */
1364 ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p,
1365 Eterm insert_id,
1366 Eterm bind_id);
1367
1368 /**
1369 *
1370 * @brief Create, insert, and bind a receive marker at the end
1371 * of the signal queue of the calling process and unless
1372 * the signal queue is empty.
1373 *
1374 *
1375 * @param[in] c_p Pointer to process struct of
1376 * currently executing process.
1377 *
1378 * @param[in] id An internal reference to bind
1379 * the receive marker to. Other
1380 * terms are allowed, but will
1381 * be ignored.
1382 */
1383 ERTS_GLB_INLINE void erts_msgq_recv_marker_insert_bind(Process *c_p,
1384 Eterm id);
1385
1386
1387 /**
1388 *
1389 * @brief Set the message queue save pointer to the position
1390 * identified by the previously inserted receive marker.
1391 *
1392 *
1393 * @param[in] c_p Pointer to process struct of
1394 * currently executing process.
1395 *
1396 * @param[in] id Internal reference bound to
1397 * a receive marker. Other terms
1398 * are allowed but will be
1399 * ignored.
1400 */
1401 ERTS_GLB_INLINE void erts_msgq_recv_marker_set_save(Process *c_p, Eterm id);
1402
1403 /**
1404 *
1405 * @brief Clear receive marker corresponding to the argument
1406 * id.
1407 *
1408 *
1409 * @param[in] c_p Pointer to process struct of
1410 * currently executing process.
1411 *
1412 * @param[in] id Internal reference bound to
1413 * a receive marker or an insert
1414 * id. Other terms are allowed
1415 * but will be ignored.
1416 */
1417 ERTS_GLB_INLINE void erts_msgq_recv_marker_clear(Process *c_p, Eterm id);
1418
1419
1420 /**
1421 *
1422 * @brief Peek on next message (identified by save pointer) in
1423 * message queue.
1424 *
1425 *
1426 * @param[in] c_p Pointer to process struct of
1427 * currently executing process.
1428 *
1429 */
1430 ERTS_GLB_INLINE ErtsMessage *erts_msgq_peek_msg(Process *c_p);
1431
1432 /**
1433 *
1434 * @brief Remove a message from the message queue.
1435 *
1436 *
1437 * @param[in] c_p Pointer to process struct of
1438 * currently executing process.
1439 *
1440 * @param[in] msgp A pointer to the message to
1441 * remove from the message queue.
1442 *
1443 */
1444 ERTS_GLB_INLINE void erts_msgq_unlink_msg(Process *c_p,
1445 ErtsMessage *msgp);
1446
1447 /**
1448 *
1449 * @brief Set the save pointer to the start of the message queue.
1450 *
1451 *
1452 * @param[in] c_p Pointer to process struct of
1453 * currently executing process.
1454 *
1455 */
1456 ERTS_GLB_INLINE void erts_msgq_set_save_first(Process *c_p);
1457
1458 /**
1459 *
1460 * @brief Advance the save pointer to the next message in the
1461 * message queue.
1462 *
1463 *
1464 * @param[in] c_p Pointer to process struct of
1465 * currently executing process.
1466 *
1467 */
1468 ERTS_GLB_INLINE void erts_msgq_set_save_next(Process *c_p);
1469
1470 /**
1471 *
1472 * @brief Set the save pointer to the end of the message queue.
1473 *
1474 *
1475 * @param[in] c_p Pointer to process struct of
1476 * currently executing process.
1477 *
1478 */
1479 ERTS_GLB_INLINE void erts_msgq_set_save_end(Process *c_p);
1480
1481 /**
1482 *
1483 * @brief Cleanup private signal queues at termination of
1484 * process.
1485 *
1486 *
1487 * @param[in] c_p Pointer to process struct of
1488 * currently executing process.
1489 *
1490 */
1491 void erts_proc_sig_cleanup_queues(Process *c_p);
1492
1493
1494 /**
1495 * @brief Initialize this functionality
1496 */
1497 void erts_proc_sig_queue_init(void);
1498
1499 void
1500 erts_proc_sig_debug_foreach_sig(Process *c_p,
1501 void (*msg_func)(ErtsMessage *, void *),
1502 void (*oh_func)(ErlOffHeap *, void *),
1503 ErtsMonitorFunc mon_func,
1504 ErtsLinkFunc lnk_func,
1505 void (*ext_func)(ErtsDistExternal *, void *),
1506 void *arg);
1507
1508 extern Process *erts_dirty_process_signal_handler;
1509 extern Process *erts_dirty_process_signal_handler_high;
1510 extern Process *erts_dirty_process_signal_handler_max;
1511
1512 /* Helpers... */
1513 void erts_proc_sig_fetch__(Process *proc);
1514 Sint erts_proc_sig_fetch_msgq_len_offs__(Process *proc);
1515 ERTS_GLB_INLINE int erts_msgq_eq_recv_mark_id__(Eterm term1, Eterm term2);
1516 ERTS_GLB_INLINE void erts_msgq_recv_marker_set_save__(Process *c_p,
1517 ErtsRecvMarkerBlock *blkp,
1518 ErtsRecvMarker *markp,
1519 int ix);
1520 Eterm erts_msgq_recv_marker_create_insert(Process *c_p, Eterm id);
1521 void erts_msgq_recv_marker_create_insert_set_save(Process *c_p, Eterm id);
1522 ErtsMessage **erts_msgq_pass_recv_markers(Process *c_p,
1523 ErtsMessage **markpp);
1524 void erts_msgq_remove_leading_recv_markers(Process *c_p);
1525
1526 #define ERTS_RECV_MARKER_IX__(BLKP, MRKP) \
1527 ((int) ((MRKP) - &(BLKP)->marker[0]))
1528
1529 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
1530
1531 ERTS_GLB_INLINE Uint64
erts_proc_sig_new_unlink_id(Process * c_p)1532 erts_proc_sig_new_unlink_id(Process *c_p)
1533 {
1534 Uint64 id;
1535 ASSERT(c_p);
1536
1537 id = (Uint64) c_p->uniq++;
1538 if (id == 0)
1539 id = (Uint64) c_p->uniq++;
1540 return id;
1541 }
1542
1543 ERTS_GLB_INLINE Sint
erts_proc_sig_fetch(Process * proc)1544 erts_proc_sig_fetch(Process *proc)
1545 {
1546 Sint res = 0;
1547 ErtsSignal *sig;
1548
1549 ERTS_LC_ASSERT(ERTS_PROC_IS_EXITING(proc)
1550 || ((erts_proc_lc_my_proc_locks(proc)
1551 & (ERTS_PROC_LOCK_MAIN
1552 | ERTS_PROC_LOCK_MSGQ))
1553 == (ERTS_PROC_LOCK_MAIN
1554 | ERTS_PROC_LOCK_MSGQ)));
1555
1556 ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc);
1557 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
1558
1559 sig = (ErtsSignal *) proc->sig_inq.first;
1560 if (sig) {
1561 if (ERTS_LIKELY(sig->common.tag != ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK))
1562 erts_proc_sig_fetch__(proc);
1563 else
1564 res = erts_proc_sig_fetch_msgq_len_offs__(proc);
1565 }
1566
1567 res += proc->sig_qs.len;
1568
1569 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc, !0);
1570
1571 #ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
1572 {
1573 Sint len = 0;
1574 ERTS_FOREACH_SIG_PRIVQS(
1575 proc, mp,
1576 {
1577 if (ERTS_SIG_IS_MSG(mp))
1578 len++;
1579 });
1580 ERTS_ASSERT(res == len);
1581 }
1582 #endif
1583
1584 return res;
1585 }
1586
1587 ERTS_GLB_INLINE void
erts_proc_notify_new_sig(Process * rp,erts_aint32_t state,erts_aint32_t enable_flag)1588 erts_proc_notify_new_sig(Process* rp, erts_aint32_t state,
1589 erts_aint32_t enable_flag)
1590 {
1591 if (~(state & (ERTS_PSFLG_EXITING
1592 | ERTS_PSFLG_ACTIVE_SYS
1593 | ERTS_PSFLG_SIG_IN_Q))
1594 | (~state & enable_flag)) {
1595 /* Schedule process... */
1596 state = erts_proc_sys_schedule(rp, state, enable_flag);
1597 }
1598
1599 if (state & ERTS_PSFLG_DIRTY_RUNNING) {
1600 /*
1601 * We ignore ERTS_PSFLG_DIRTY_RUNNING_SYS. For
1602 * more info see erts_execute_dirty_system_task()
1603 * in erl_process.c.
1604 */
1605 erts_make_dirty_proc_handled(rp->common.id, state, -1);
1606 }
1607 }
1608
1609 #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__
1610 #define ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__(BLKP) \
1611 do { \
1612 if ((BLKP)->pending_set_save_ix >= 0) { \
1613 int clr_ix__ = (BLKP)->pending_set_save_ix; \
1614 ErtsRecvMarker *clr_markp__ = &(BLKP)->marker[clr_ix__]; \
1615 ASSERT(!clr_markp__->in_msgq); \
1616 ASSERT(clr_markp__->in_sigq); \
1617 ASSERT(clr_markp__->set_save); \
1618 clr_markp__->set_save = 0; \
1619 (BLKP)->pending_set_save_ix = -1; \
1620 } \
1621 } while (0)
1622
1623 #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__
1624 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
1625
1626 #define ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(BLKP) \
1627 do { \
1628 if ((BLKP)->old_recv_marker_ix >= 0) { \
1629 int ix__ = (BLKP)->old_recv_marker_ix; \
1630 ASSERT((BLKP)->ref[ix__] == erts_old_recv_marker_id); \
1631 ASSERT((BLKP)->marker[ix__].in_sigq); \
1632 ASSERT(!(BLKP)->marker[ix__].set_save); \
1633 (BLKP)->unused++; \
1634 (BLKP)->ref[ix__] = am_undefined; \
1635 (BLKP)->marker[ix__].pass = ERTS_RECV_MARKER_PASS_MAX; \
1636 (BLKP)->old_recv_marker_ix = -1; \
1637 } \
1638 } while (0)
1639
1640 #endif
1641
1642 ERTS_GLB_INLINE int
erts_msgq_eq_recv_mark_id__(Eterm term1,Eterm term2)1643 erts_msgq_eq_recv_mark_id__(Eterm term1, Eterm term2)
1644 {
1645 int ix, arity;
1646 Eterm *tp1, *tp2;
1647
1648 ASSERT(term1 == am_free || term1 == am_undefined || term1 == NIL
1649 || is_small(term1) || is_big(term1) || is_internal_ref(term1));
1650 ASSERT(term2 == am_free || term2 == am_undefined || term2 == NIL
1651 || is_small(term2) || is_big(term2) || is_internal_ref(term2));
1652
1653 if (term1 == term2)
1654 return !0;
1655
1656 if (!is_boxed(term1) || !is_boxed(term2))
1657 return 0;
1658
1659 tp1 = boxed_val(term1);
1660 tp2 = boxed_val(term2);
1661
1662 if (*tp1 != *tp2)
1663 return 0;
1664
1665 arity = (int) thing_arityval(*tp1);
1666 for (ix = 1; ix <= arity; ix++) {
1667 if (tp1[ix] != tp2[ix])
1668 return 0;
1669 }
1670 return !0;
1671 }
1672
1673 ERTS_GLB_INLINE void
erts_msgq_recv_marker_set_save__(Process * c_p,ErtsRecvMarkerBlock * blkp,ErtsRecvMarker * markp,int ix)1674 erts_msgq_recv_marker_set_save__(Process *c_p,
1675 ErtsRecvMarkerBlock *blkp,
1676 ErtsRecvMarker *markp,
1677 int ix)
1678 {
1679 ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__(blkp);
1680
1681 ASSERT(markp->proc == c_p);
1682 ASSERT(!markp->set_save);
1683 ASSERT(markp->in_sigq);
1684
1685 if (markp->in_msgq) {
1686 ErtsMessage **sigpp = &markp->sig.common.next;
1687 if (*sigpp && ERTS_SIG_IS_RECV_MARKER(*sigpp))
1688 sigpp = erts_msgq_pass_recv_markers(c_p, sigpp);
1689 c_p->sig_qs.save = sigpp;
1690 }
1691 else {
1692 /*
1693 * Marker is in the middle queue of signals not
1694 * processed yet. Trigger handling of signals in loop_rec
1695 * by setting save pointer to the end of message queue
1696 * (inner queue). This in order to get the recv marker
1697 * into the message queue.
1698 */
1699 c_p->sig_qs.save = c_p->sig_qs.last;
1700 ASSERT(!(*c_p->sig_qs.save));
1701 /*
1702 * Set save pointer when marker enters message queue...
1703 */
1704 markp->set_save = !0;
1705 ASSERT(blkp->pending_set_save_ix == -1);
1706 ASSERT(ix == ERTS_RECV_MARKER_IX__(blkp, markp));
1707 blkp->pending_set_save_ix = ix;
1708 }
1709 }
1710
1711 ERTS_GLB_INLINE void
erts_msgq_recv_marker_clear(Process * c_p,Eterm id)1712 erts_msgq_recv_marker_clear(Process *c_p, Eterm id)
1713 {
1714 ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
1715 int ix;
1716
1717 if (!is_small(id) && !is_big(id) && !is_internal_ref(id))
1718 return;
1719
1720 if (!blkp)
1721 return;
1722
1723 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
1724 if (id == erts_old_recv_marker_id) {
1725 ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp);
1726 return;
1727 }
1728 #endif
1729
1730 for (ix = 0; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) {
1731 if (erts_msgq_eq_recv_mark_id__(blkp->ref[ix], id)) {
1732 blkp->unused++;
1733 blkp->ref[ix] = am_undefined;
1734 blkp->marker[ix].pass = ERTS_RECV_MARKER_PASS_MAX;
1735 break;
1736 }
1737 }
1738 }
1739
1740 ERTS_GLB_INLINE Eterm
erts_msgq_recv_marker_insert(Process * c_p)1741 erts_msgq_recv_marker_insert(Process *c_p)
1742 {
1743 erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
1744 erts_proc_sig_fetch(c_p);
1745 erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
1746
1747 if (c_p->sig_qs.cont || c_p->sig_qs.first)
1748 return erts_msgq_recv_marker_create_insert(c_p, am_new_uniq);
1749 return am_undefined;
1750 }
1751
erts_msgq_recv_marker_bind(Process * c_p,Eterm insert_id,Eterm bind_id)1752 ERTS_GLB_INLINE void erts_msgq_recv_marker_bind(Process *c_p,
1753 Eterm insert_id,
1754 Eterm bind_id)
1755 {
1756 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
1757 ASSERT(bind_id != erts_old_recv_marker_id);
1758 #endif
1759
1760 if (is_small(insert_id) || is_big(insert_id)) {
1761 ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
1762
1763 if (blkp) {
1764 int ix;
1765 for (ix = 0; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) {
1766 if (erts_msgq_eq_recv_mark_id__(blkp->ref[ix], insert_id)) {
1767 if (is_internal_ref(bind_id))
1768 blkp->ref[ix] = bind_id;
1769 else {
1770 blkp->unused++;
1771 blkp->ref[ix] = am_undefined;
1772 blkp->marker[ix].pass = ERTS_RECV_MARKER_PASS_MAX;
1773 }
1774 break;
1775 }
1776 }
1777 }
1778 }
1779 }
1780
1781
1782 ERTS_GLB_INLINE void
erts_msgq_recv_marker_insert_bind(Process * c_p,Eterm id)1783 erts_msgq_recv_marker_insert_bind(Process *c_p, Eterm id)
1784 {
1785 if (is_internal_ref(id)) {
1786 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
1787 ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
1788 if (blkp && erts_old_recv_marker_id == id)
1789 ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp);
1790 #endif
1791
1792 erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
1793 erts_proc_sig_fetch(c_p);
1794 erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
1795
1796 if (c_p->sig_qs.cont || c_p->sig_qs.first)
1797 (void) erts_msgq_recv_marker_create_insert(c_p, id);
1798 }
1799 }
1800
1801 ERTS_GLB_INLINE void
erts_msgq_recv_marker_set_save(Process * c_p,Eterm id)1802 erts_msgq_recv_marker_set_save(Process *c_p, Eterm id)
1803 {
1804 if (is_internal_ref(id)) {
1805 ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
1806
1807 if (blkp) {
1808 int ix;
1809 for (ix = 0; ix < ERTS_RECV_MARKER_BLOCK_SIZE; ix++) {
1810 if (erts_msgq_eq_recv_mark_id__(blkp->ref[ix], id)) {
1811 ErtsRecvMarker *markp = &blkp->marker[ix];
1812 erts_msgq_recv_marker_set_save__(c_p, blkp, markp, ix);
1813 break;
1814 }
1815 }
1816 }
1817
1818 }
1819 }
1820
1821 ERTS_GLB_INLINE ErtsMessage *
erts_msgq_peek_msg(Process * c_p)1822 erts_msgq_peek_msg(Process *c_p)
1823 {
1824 ASSERT(!(*c_p->sig_qs.save) || ERTS_SIG_IS_MSG(*c_p->sig_qs.save));
1825 return *c_p->sig_qs.save;
1826 }
1827
1828 ERTS_GLB_INLINE void
erts_msgq_unlink_msg(Process * c_p,ErtsMessage * msgp)1829 erts_msgq_unlink_msg(Process *c_p, ErtsMessage *msgp)
1830 {
1831 ErtsMessage *sigp = msgp->next;
1832 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "before");
1833 *c_p->sig_qs.save = sigp;
1834 c_p->sig_qs.len--;
1835 if (sigp && ERTS_SIG_IS_RECV_MARKER(sigp)) {
1836 ErtsMessage **sigpp = c_p->sig_qs.save;
1837 ((ErtsRecvMarker *) sigp)->prev_next = sigpp;
1838 c_p->sig_qs.save = erts_msgq_pass_recv_markers(c_p, sigpp);
1839 sigp = *c_p->sig_qs.save;
1840 }
1841 if (!sigp)
1842 c_p->sig_qs.last = c_p->sig_qs.save;
1843 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__(c_p, 0, "after");
1844 }
1845
1846 ERTS_GLB_INLINE void
erts_msgq_set_save_first(Process * c_p)1847 erts_msgq_set_save_first(Process *c_p)
1848 {
1849 ErtsRecvMarkerBlock *blkp = c_p->sig_qs.recv_mrk_blk;
1850 if (blkp) {
1851 ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__(blkp);
1852 #ifdef ERTS_SUPPORT_OLD_RECV_MARK_INSTRS
1853 ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__(blkp);
1854 #endif
1855 }
1856
1857 /*
1858 * Remove any receive markers at the front of the
1859 * message queue, since they don't have any purpose
1860 * anymore...
1861 */
1862 if (c_p->sig_qs.first && ERTS_SIG_IS_RECV_MARKER(c_p->sig_qs.first))
1863 erts_msgq_remove_leading_recv_markers(c_p);
1864 c_p->sig_qs.save = &c_p->sig_qs.first;
1865 }
1866
1867 ERTS_GLB_INLINE void
erts_msgq_set_save_next(Process * c_p)1868 erts_msgq_set_save_next(Process *c_p)
1869 {
1870 ErtsMessage *sigp = (*c_p->sig_qs.save)->next;
1871 ErtsMessage **sigpp = &(*c_p->sig_qs.save)->next;
1872 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
1873 if (sigp && ERTS_SIG_IS_RECV_MARKER(sigp))
1874 sigpp = erts_msgq_pass_recv_markers(c_p, sigpp);
1875 c_p->sig_qs.save = sigpp;
1876 ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
1877 }
1878
1879 ERTS_GLB_INLINE void
erts_msgq_set_save_end(Process * c_p)1880 erts_msgq_set_save_end(Process *c_p)
1881 {
1882 /* Set save pointer to end of message queue... */
1883
1884 erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
1885 erts_proc_sig_fetch(c_p);
1886 erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
1887
1888 if (!c_p->sig_qs.cont)
1889 c_p->sig_qs.save = c_p->sig_qs.last;
1890 else {
1891 /*
1892 * Unhandled signals in middle queue; we need to
1893 * pass a receive marker through it...
1894 */
1895 erts_msgq_recv_marker_create_insert_set_save(c_p, NIL);
1896 }
1897 }
1898
1899 #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_PENDING_SET_SAVE__
1900 #undef ERTS_PROC_SIG_RECV_MARK_CLEAR_OLD_MARK__
1901
1902 #endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
1903
1904 #endif /* ERTS_PROC_SIG_QUEUE_H__ */
1905