1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 1996-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 #ifndef __DIST_H__
22 #define __DIST_H__
23 
24 #include "erl_process.h"
25 #include "erl_node_tables.h"
26 #include "zlib.h"
27 
28 #define DFLAG_PUBLISHED               ((Uint64)0x01)
29 #define DFLAG_ATOM_CACHE              ((Uint64)0x02)
30 #define DFLAG_EXTENDED_REFERENCES     ((Uint64)0x04)
31 #define DFLAG_DIST_MONITOR            ((Uint64)0x08)
32 #define DFLAG_FUN_TAGS                ((Uint64)0x10)
33 #define DFLAG_DIST_MONITOR_NAME       ((Uint64)0x20)
34 #define DFLAG_HIDDEN_ATOM_CACHE       ((Uint64)0x40)
35 #define DFLAG_NEW_FUN_TAGS            ((Uint64)0x80)
36 #define DFLAG_EXTENDED_PIDS_PORTS    ((Uint64)0x100)
37 #define DFLAG_EXPORT_PTR_TAG         ((Uint64)0x200)
38 #define DFLAG_BIT_BINARIES           ((Uint64)0x400)
39 #define DFLAG_NEW_FLOATS             ((Uint64)0x800)
40 #define DFLAG_UNICODE_IO            ((Uint64)0x1000)
41 #define DFLAG_DIST_HDR_ATOM_CACHE   ((Uint64)0x2000)
42 #define DFLAG_SMALL_ATOM_TAGS       ((Uint64)0x4000)
43 #define DFLAG_ETS_COMPRESSED        ((Uint64)0x8000) /* internal */
44 #define DFLAG_UTF8_ATOMS           ((Uint64)0x10000)
45 #define DFLAG_MAP_TAG              ((Uint64)0x20000)
46 #define DFLAG_BIG_CREATION         ((Uint64)0x40000)
47 #define DFLAG_SEND_SENDER          ((Uint64)0x80000)
48 #define DFLAG_BIG_SEQTRACE_LABELS ((Uint64)0x100000)
49 #define DFLAG_PENDING_CONNECT     ((Uint64)0x200000) /* internal */
50 #define DFLAG_EXIT_PAYLOAD        ((Uint64)0x400000)
51 #define DFLAG_FRAGMENTS           ((Uint64)0x800000)
52 #define DFLAG_HANDSHAKE_23       ((Uint64)0x1000000)
53 #define DFLAG_UNLINK_ID          ((Uint64)0x2000000)
54 #define DFLAG_RESERVED          ((Uint64)0xfc000000)
55 /*
56  * As the old handshake only support 32 flag bits, we reserve the remaining
57  * bits in the lower 32 for changes in the handshake protocol or potentially
58  * new capabilities that we also want to backport to OTP-22 or older.
59  */
60 #define DFLAG_SPAWN            (((Uint64)0x1) << 32)
61 #define DFLAG_NAME_ME          (((Uint64)0x2) << 32)
62 #define DFLAG_NAME_ME          (((Uint64)0x2) << 32)
63 
64 
65 /* Mandatory flags for distribution */
66 #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES         \
67                               | DFLAG_EXTENDED_PIDS_PORTS       \
68 			      | DFLAG_UTF8_ATOMS                \
69 			      | DFLAG_NEW_FUN_TAGS              \
70                               | DFLAG_BIG_CREATION)
71 
72 /*
73  * Additional optimistic flags when encoding toward pending connection.
74  * If remote node (erl_interface) does not support these then we may need
75  * to transcode messages enqueued before connection setup was finished.
76  */
77 #define DFLAG_DIST_HOPEFULLY (DFLAG_EXPORT_PTR_TAG              \
78                               | DFLAG_BIT_BINARIES              \
79                               | DFLAG_DIST_MONITOR              \
80                               | DFLAG_DIST_MONITOR_NAME         \
81                               | DFLAG_SPAWN                     \
82                               | DFLAG_UNLINK_ID)
83 
84 /* Our preferred set of flags. Used for connection setup handshake */
85 #define DFLAG_DIST_DEFAULT (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY \
86                             | DFLAG_FUN_TAGS                  \
87                             | DFLAG_NEW_FLOATS                \
88                             | DFLAG_UNICODE_IO                \
89                             | DFLAG_DIST_HDR_ATOM_CACHE       \
90                             | DFLAG_SMALL_ATOM_TAGS           \
91                             | DFLAG_UTF8_ATOMS                \
92                             | DFLAG_MAP_TAG                   \
93                             | DFLAG_SEND_SENDER               \
94                             | DFLAG_BIG_SEQTRACE_LABELS       \
95                             | DFLAG_EXIT_PAYLOAD              \
96                             | DFLAG_FRAGMENTS                 \
97                             | DFLAG_HANDSHAKE_23              \
98                             | DFLAG_SPAWN                     \
99                             | DFLAG_UNLINK_ID)
100 
101 /* Flags addable by local distr implementations */
102 #define DFLAG_DIST_ADDABLE    DFLAG_DIST_DEFAULT
103 
104 /* Flags rejectable by local distr implementation */
105 #define DFLAG_DIST_REJECTABLE (DFLAG_DIST_HDR_ATOM_CACHE         \
106                                | DFLAG_HIDDEN_ATOM_CACHE         \
107                                | DFLAG_FRAGMENTS                 \
108                                | DFLAG_ATOM_CACHE)
109 
110 /* Flags for all features needing strict order delivery */
111 #define DFLAG_DIST_STRICT_ORDER DFLAG_DIST_HDR_ATOM_CACHE
112 
113 /* All flags that should be enabled when term_to_binary/1 is used. */
114 #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES	\
115 			       | DFLAG_NEW_FUN_TAGS		\
116 			       | DFLAG_NEW_FLOATS		\
117 			       | DFLAG_EXTENDED_PIDS_PORTS	\
118 			       | DFLAG_EXPORT_PTR_TAG		\
119 			       | DFLAG_BIT_BINARIES             \
120 			       | DFLAG_MAP_TAG                  \
121                                | DFLAG_BIG_CREATION)
122 
123 /* opcodes used in distribution messages */
124 enum dop {
125     DOP_LINK                = 1,
126     DOP_SEND                = 2,
127     DOP_EXIT                = 3,
128     DOP_UNLINK              = 4,
129 /* Ancient DOP_NODE_LINK (5) was here, can be reused */
130     DOP_REG_SEND            = 6,
131     DOP_GROUP_LEADER        = 7,
132     DOP_EXIT2               = 8,
133 
134     DOP_SEND_TT             = 12,
135     DOP_EXIT_TT             = 13,
136     DOP_REG_SEND_TT         = 16,
137     DOP_EXIT2_TT            = 18,
138 
139     DOP_MONITOR_P           = 19,
140     DOP_DEMONITOR_P         = 20,
141     DOP_MONITOR_P_EXIT      = 21,
142 
143     DOP_SEND_SENDER         = 22,
144     DOP_SEND_SENDER_TT      = 23,
145 
146     /* These are used when DFLAG_EXIT_PAYLOAD is detected */
147     DOP_PAYLOAD_EXIT           = 24,
148     DOP_PAYLOAD_EXIT_TT        = 25,
149     DOP_PAYLOAD_EXIT2          = 26,
150     DOP_PAYLOAD_EXIT2_TT       = 27,
151     DOP_PAYLOAD_MONITOR_P_EXIT = 28,
152 
153     DOP_SPAWN_REQUEST       = 29,
154     DOP_SPAWN_REQUEST_TT    = 30,
155     DOP_SPAWN_REPLY         = 31,
156     DOP_SPAWN_REPLY_TT      = 32,
157     DOP_UNLINK_ID           = 35,
158     DOP_UNLINK_ID_ACK       = 36
159 };
160 
161 #define ERTS_DIST_SPAWN_FLAG_LINK       (1 << 0)
162 #define ERTS_DIST_SPAWN_FLAG_MONITOR    (1 << 1)
163 
164 /* distribution trap functions */
165 extern Export* dmonitor_node_trap;
166 
167 typedef enum {
168     ERTS_DSP_NO_LOCK,
169     ERTS_DSP_RLOCK
170 } ErtsDSigPrepLock;
171 
172 
173 /* Must be larger or equal to 16 */
174 #ifdef DEBUG
175 #define ERTS_DIST_FRAGMENT_SIZE 1024
176 #else
177 /* This should be made configurable */
178 #define ERTS_DIST_FRAGMENT_SIZE (64 * 1024)
179 #endif
180 
181 #define ERTS_DIST_FRAGMENT_HEADER_SIZE (1 + 1 + 8 + 8) /* magic, header, seq id, frag id*/
182 
183 #define ERTS_DE_BUSY_LIMIT (1024*1024)
184 extern int erts_dist_buf_busy_limit;
185 extern int erts_is_alive;
186 
187 /*
188  * erts_dsig_prepare() prepares a send of a distributed signal.
189  * One of the values defined below are returned.
190  */
191 
192 /* Connected; signals can be enqueued and sent. */
193 #define ERTS_DSIG_PREP_CONNECTED	0
194 /* Not connected; connection needs to be set up. */
195 #define ERTS_DSIG_PREP_NOT_CONNECTED	1
196 /* Caller would be suspended on send operation. */
197 #define ERTS_DSIG_PREP_WOULD_SUSPEND	2
198 /* System not alive (distributed) */
199 #define ERTS_DSIG_PREP_NOT_ALIVE	3
200 /* Pending connection; signals can be enqueued */
201 #define ERTS_DSIG_PREP_PENDING	        4
202 
203 /* dist_ctrl_{g,s}et_option/2 */
204 #define ERTS_DIST_CTRL_OPT_GET_SIZE     ((Uint32) (1 << 0))
205 
206 /* for emulator internal testing... */
207 extern int erts_dflags_test_remove_hopefull_flags;
208 
209 #ifdef DEBUG
210 #define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) \
211     erts_dbg_chk_no_dist_proc_link((D), (R), (L))
212 #else
213 #define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L)
214 #endif
215 
216 /* Define for testing */
217 /* #define EXTREME_TTB_TRAPPING 1 */
218 
219 #ifndef EXTREME_TTB_TRAPPING
220 #define TERM_TO_BINARY_LOOP_FACTOR 32
221 #else
222 #define TERM_TO_BINARY_LOOP_FACTOR 1
223 #endif
224 
225 typedef enum { TTBSize, TTBEncode, TTBCompress } TTBState;
226 typedef struct TTBSizeContext_ {
227     Uint64 dflags;
228     int level;
229     Sint vlen;
230     int iovec;
231     Uint fragment_size;
232     Uint last_result;
233     Uint extra_size;
234     Uint result;
235     Eterm obj;
236     ErtsWStack wstack;
237 } TTBSizeContext;
238 
239 #define ERTS_INIT_TTBSizeContext(Ctx, Flags)                    \
240     do {                                                        \
241         (Ctx)->wstack.wstart = NULL;                            \
242         (Ctx)->dflags = (Flags);                                 \
243         (Ctx)->level = 0;                                       \
244         (Ctx)->vlen = -1;                                       \
245         (Ctx)->fragment_size = ~((Uint) 0);                     \
246         (Ctx)->extra_size = 0;                                  \
247         (Ctx)->last_result = 0;                                 \
248     } while (0)
249 
250 typedef struct TTBEncodeContext_ {
251     Uint64 dflags;
252     Uint64 hopefull_flags;
253     byte *hopefull_flagsp;
254     int level;
255     byte* ep;
256     Eterm obj;
257     ErtsWStack wstack;
258     Binary *result_bin;
259     byte *cptr;
260     Sint vlen;
261     Uint size;
262     byte *payload_ixp;
263     byte *hopefull_ixp;
264     SysIOVec* iov;
265     ErlDrvBinary** binv;
266     Eterm *termv;
267     int iovec;
268     Uint fragment_size;
269     Sint frag_ix;
270     ErlIOVec *fragment_eiovs;
271 #ifdef DEBUG
272     int debug_fragments;
273     int debug_vlen;
274 #endif
275 } TTBEncodeContext;
276 
277 #define ERTS_INIT_TTBEncodeContext(Ctx, Flags)                  \
278     do {                                                        \
279         (Ctx)->wstack.wstart = NULL;                            \
280         (Ctx)->dflags = (Flags);                                 \
281         (Ctx)->level = 0;                                       \
282         (Ctx)->vlen = 0;                                        \
283         (Ctx)->size = 0;                                        \
284         (Ctx)->termv = NULL;                                    \
285         (Ctx)->iov = NULL;                                      \
286         (Ctx)->binv = NULL;                                     \
287         (Ctx)->fragment_size = ~((Uint) 0);                     \
288         if ((Flags) & DFLAG_PENDING_CONNECT) {                  \
289             (Ctx)->hopefull_flags = 0;                          \
290             (Ctx)->hopefull_flagsp = NULL;                      \
291             (Ctx)->hopefull_ixp = NULL;                         \
292             (Ctx)->payload_ixp = NULL;                          \
293         }                                                       \
294     } while (0)
295 
296 typedef struct {
297     Uint real_size;
298     Uint dest_len;
299     byte *dbytes;
300     Binary *result_bin;
301     Binary *destination_bin;
302     z_stream stream;
303 } TTBCompressContext;
304 
305 typedef struct {
306     int alive;
307     TTBState state;
308     union {
309 	TTBSizeContext sc;
310 	TTBEncodeContext ec;
311 	TTBCompressContext cc;
312     } s;
313 } TTBContext;
314 
315 enum erts_dsig_send_phase {
316     ERTS_DSIG_SEND_PHASE_INIT,
317     ERTS_DSIG_SEND_PHASE_MSG_SIZE,
318     ERTS_DSIG_SEND_PHASE_ALLOC,
319     ERTS_DSIG_SEND_PHASE_MSG_ENCODE,
320     ERTS_DSIG_SEND_PHASE_FIN,
321     ERTS_DSIG_SEND_PHASE_SEND
322 };
323 
324 typedef struct erts_dsig_send_context {
325     int connect;
326     int no_suspend;
327     int no_trap;
328 
329     Eterm ctl;
330     Eterm msg;
331     Eterm from;
332     Eterm ctl_heap[8]; /* 7-tuple (SPAWN_REQUEST_TT) */
333     Eterm return_term;
334 
335     DistEntry *dep;
336     Eterm node;   /* used if dep == NULL */
337     Eterm cid;
338     Eterm connection_id;
339     int deref_dep;
340 
341     enum erts_dsig_send_phase phase;
342     Sint reds;
343 
344     Uint data_size, dhdr_ext_size;
345     byte *dhdrp, *extp;
346     ErtsAtomCacheMap *acmp;
347     ErtsDistOutputBuf *obuf;
348     Uint alloced_fragments, fragments;
349     Sint vlen;
350     Uint64 dflags;
351     Process *c_p;
352     union {
353 	TTBSizeContext sc;
354 	TTBEncodeContext ec;
355     }u;
356 
357 } ErtsDSigSendContext;
358 
359 typedef struct dist_sequences DistSeqNode;
360 
361 struct dist_sequences {
362     ErlHeapFragment hfrag;
363     struct dist_sequences *parent;
364     struct dist_sequences *left;
365     struct dist_sequences *right;
366     char is_red;
367 
368     Uint64 seq_id;
369     int cnt;
370     Sint ctl_len;
371 };
372 
373 /*
374  * erts_dsig_send_* return values.
375  */
376 #define ERTS_DSIG_SEND_OK	0
377 #define ERTS_DSIG_SEND_YIELD	1
378 #define ERTS_DSIG_SEND_CONTINUE 2
379 #define ERTS_DSIG_SEND_TOO_LRG  3
380 
381 extern int erts_dsig_send_msg(ErtsDSigSendContext*, Eterm, Eterm);
382 extern int erts_dsig_send_reg_msg(ErtsDSigSendContext*, Eterm, Eterm, Eterm);
383 extern int erts_dsig_send_link(ErtsDSigSendContext *, Eterm, Eterm);
384 extern int erts_dsig_send_exit_tt(ErtsDSigSendContext *, Process *, Eterm, Eterm, Eterm);
385 extern int erts_dsig_send_unlink(ErtsDSigSendContext *, Eterm, Eterm, Uint64);
386 extern int erts_dsig_send_unlink_ack(ErtsDSigSendContext *, Eterm, Eterm, Uint64);
387 extern int erts_dsig_send_group_leader(ErtsDSigSendContext *, Eterm, Eterm);
388 extern int erts_dsig_send_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm);
389 extern int erts_dsig_send_exit2(ErtsDSigSendContext *, Eterm, Eterm, Eterm);
390 extern int erts_dsig_send_demonitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm);
391 extern int erts_dsig_send_monitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm);
392 extern int erts_dsig_send_m_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm);
393 extern int erts_dsig_send_spawn_reply(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm, Eterm);
394 
395 extern int erts_dsig_send(ErtsDSigSendContext *dsdp);
396 extern int erts_dsend_context_dtor(Binary*);
397 extern Eterm erts_dsend_export_trap_context(Process* p, ErtsDSigSendContext* ctx);
398 
399 extern int erts_dist_command(Port *prt, int reds);
400 extern void erts_dist_port_not_busy(Port *prt);
401 extern void erts_kill_dist_connection(DistEntry *dep, Uint32);
402 
403 extern Uint erts_dist_cache_size(void);
404 
405 extern Sint erts_abort_pending_connection_rwunlock(DistEntry *dep, int *);
406 
407 extern void erts_debug_dist_seq_tree_foreach(
408     DistEntry *dep,
409     int (*func)(DistSeqNode *, void*, Sint), void *args);
410 
411 extern int erts_dsig_prepare(ErtsDSigSendContext *,
412                              DistEntry*,
413                              Process *,
414                              ErtsProcLocks,
415                              ErtsDSigPrepLock,
416                              int,
417                              int,
418                              int);
419 
420 void erts_dist_print_procs_suspended_on_de(fmtfn_t to, void *to_arg);
421 int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks);
422 
423 Uint erts_ttb_iov_size(int use_termv, Sint vlen, Uint fragments);
424 void erts_ttb_iov_init(TTBEncodeContext *ctx, int use_termv, char *ptr,
425                        Sint vlen, Uint fragments, Uint fragments_size);
426 #endif
427