1 /* 2 * %CopyrightBegin% 3 * 4 * Copyright Ericsson AB 1996-2020. All Rights Reserved. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * %CopyrightEnd% 19 */ 20 21 #ifndef __DIST_H__ 22 #define __DIST_H__ 23 24 #include "erl_process.h" 25 #include "erl_node_tables.h" 26 #include "zlib.h" 27 28 #define DFLAG_PUBLISHED ((Uint64)0x01) 29 #define DFLAG_ATOM_CACHE ((Uint64)0x02) 30 #define DFLAG_EXTENDED_REFERENCES ((Uint64)0x04) 31 #define DFLAG_DIST_MONITOR ((Uint64)0x08) 32 #define DFLAG_FUN_TAGS ((Uint64)0x10) 33 #define DFLAG_DIST_MONITOR_NAME ((Uint64)0x20) 34 #define DFLAG_HIDDEN_ATOM_CACHE ((Uint64)0x40) 35 #define DFLAG_NEW_FUN_TAGS ((Uint64)0x80) 36 #define DFLAG_EXTENDED_PIDS_PORTS ((Uint64)0x100) 37 #define DFLAG_EXPORT_PTR_TAG ((Uint64)0x200) 38 #define DFLAG_BIT_BINARIES ((Uint64)0x400) 39 #define DFLAG_NEW_FLOATS ((Uint64)0x800) 40 #define DFLAG_UNICODE_IO ((Uint64)0x1000) 41 #define DFLAG_DIST_HDR_ATOM_CACHE ((Uint64)0x2000) 42 #define DFLAG_SMALL_ATOM_TAGS ((Uint64)0x4000) 43 #define DFLAG_ETS_COMPRESSED ((Uint64)0x8000) /* internal */ 44 #define DFLAG_UTF8_ATOMS ((Uint64)0x10000) 45 #define DFLAG_MAP_TAG ((Uint64)0x20000) 46 #define DFLAG_BIG_CREATION ((Uint64)0x40000) 47 #define DFLAG_SEND_SENDER ((Uint64)0x80000) 48 #define DFLAG_BIG_SEQTRACE_LABELS ((Uint64)0x100000) 49 #define DFLAG_PENDING_CONNECT ((Uint64)0x200000) /* internal */ 50 #define DFLAG_EXIT_PAYLOAD ((Uint64)0x400000) 51 #define DFLAG_FRAGMENTS ((Uint64)0x800000) 52 #define DFLAG_HANDSHAKE_23 ((Uint64)0x1000000) 53 #define DFLAG_UNLINK_ID ((Uint64)0x2000000) 54 #define DFLAG_RESERVED ((Uint64)0xfc000000) 55 /* 56 * As the old handshake only support 32 flag bits, we reserve the remaining 57 * bits in the lower 32 for changes in the handshake protocol or potentially 58 * new capabilities that we also want to backport to OTP-22 or older. 59 */ 60 #define DFLAG_SPAWN (((Uint64)0x1) << 32) 61 #define DFLAG_NAME_ME (((Uint64)0x2) << 32) 62 #define DFLAG_NAME_ME (((Uint64)0x2) << 32) 63 64 65 /* Mandatory flags for distribution */ 66 #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ 67 | DFLAG_EXTENDED_PIDS_PORTS \ 68 | DFLAG_UTF8_ATOMS \ 69 | DFLAG_NEW_FUN_TAGS \ 70 | DFLAG_BIG_CREATION) 71 72 /* 73 * Additional optimistic flags when encoding toward pending connection. 74 * If remote node (erl_interface) does not support these then we may need 75 * to transcode messages enqueued before connection setup was finished. 76 */ 77 #define DFLAG_DIST_HOPEFULLY (DFLAG_EXPORT_PTR_TAG \ 78 | DFLAG_BIT_BINARIES \ 79 | DFLAG_DIST_MONITOR \ 80 | DFLAG_DIST_MONITOR_NAME \ 81 | DFLAG_SPAWN \ 82 | DFLAG_UNLINK_ID) 83 84 /* Our preferred set of flags. Used for connection setup handshake */ 85 #define DFLAG_DIST_DEFAULT (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY \ 86 | DFLAG_FUN_TAGS \ 87 | DFLAG_NEW_FLOATS \ 88 | DFLAG_UNICODE_IO \ 89 | DFLAG_DIST_HDR_ATOM_CACHE \ 90 | DFLAG_SMALL_ATOM_TAGS \ 91 | DFLAG_UTF8_ATOMS \ 92 | DFLAG_MAP_TAG \ 93 | DFLAG_SEND_SENDER \ 94 | DFLAG_BIG_SEQTRACE_LABELS \ 95 | DFLAG_EXIT_PAYLOAD \ 96 | DFLAG_FRAGMENTS \ 97 | DFLAG_HANDSHAKE_23 \ 98 | DFLAG_SPAWN \ 99 | DFLAG_UNLINK_ID) 100 101 /* Flags addable by local distr implementations */ 102 #define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT 103 104 /* Flags rejectable by local distr implementation */ 105 #define DFLAG_DIST_REJECTABLE (DFLAG_DIST_HDR_ATOM_CACHE \ 106 | DFLAG_HIDDEN_ATOM_CACHE \ 107 | DFLAG_FRAGMENTS \ 108 | DFLAG_ATOM_CACHE) 109 110 /* Flags for all features needing strict order delivery */ 111 #define DFLAG_DIST_STRICT_ORDER DFLAG_DIST_HDR_ATOM_CACHE 112 113 /* All flags that should be enabled when term_to_binary/1 is used. */ 114 #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ 115 | DFLAG_NEW_FUN_TAGS \ 116 | DFLAG_NEW_FLOATS \ 117 | DFLAG_EXTENDED_PIDS_PORTS \ 118 | DFLAG_EXPORT_PTR_TAG \ 119 | DFLAG_BIT_BINARIES \ 120 | DFLAG_MAP_TAG \ 121 | DFLAG_BIG_CREATION) 122 123 /* opcodes used in distribution messages */ 124 enum dop { 125 DOP_LINK = 1, 126 DOP_SEND = 2, 127 DOP_EXIT = 3, 128 DOP_UNLINK = 4, 129 /* Ancient DOP_NODE_LINK (5) was here, can be reused */ 130 DOP_REG_SEND = 6, 131 DOP_GROUP_LEADER = 7, 132 DOP_EXIT2 = 8, 133 134 DOP_SEND_TT = 12, 135 DOP_EXIT_TT = 13, 136 DOP_REG_SEND_TT = 16, 137 DOP_EXIT2_TT = 18, 138 139 DOP_MONITOR_P = 19, 140 DOP_DEMONITOR_P = 20, 141 DOP_MONITOR_P_EXIT = 21, 142 143 DOP_SEND_SENDER = 22, 144 DOP_SEND_SENDER_TT = 23, 145 146 /* These are used when DFLAG_EXIT_PAYLOAD is detected */ 147 DOP_PAYLOAD_EXIT = 24, 148 DOP_PAYLOAD_EXIT_TT = 25, 149 DOP_PAYLOAD_EXIT2 = 26, 150 DOP_PAYLOAD_EXIT2_TT = 27, 151 DOP_PAYLOAD_MONITOR_P_EXIT = 28, 152 153 DOP_SPAWN_REQUEST = 29, 154 DOP_SPAWN_REQUEST_TT = 30, 155 DOP_SPAWN_REPLY = 31, 156 DOP_SPAWN_REPLY_TT = 32, 157 DOP_UNLINK_ID = 35, 158 DOP_UNLINK_ID_ACK = 36 159 }; 160 161 #define ERTS_DIST_SPAWN_FLAG_LINK (1 << 0) 162 #define ERTS_DIST_SPAWN_FLAG_MONITOR (1 << 1) 163 164 /* distribution trap functions */ 165 extern Export* dmonitor_node_trap; 166 167 typedef enum { 168 ERTS_DSP_NO_LOCK, 169 ERTS_DSP_RLOCK 170 } ErtsDSigPrepLock; 171 172 173 /* Must be larger or equal to 16 */ 174 #ifdef DEBUG 175 #define ERTS_DIST_FRAGMENT_SIZE 1024 176 #else 177 /* This should be made configurable */ 178 #define ERTS_DIST_FRAGMENT_SIZE (64 * 1024) 179 #endif 180 181 #define ERTS_DIST_FRAGMENT_HEADER_SIZE (1 + 1 + 8 + 8) /* magic, header, seq id, frag id*/ 182 183 #define ERTS_DE_BUSY_LIMIT (1024*1024) 184 extern int erts_dist_buf_busy_limit; 185 extern int erts_is_alive; 186 187 /* 188 * erts_dsig_prepare() prepares a send of a distributed signal. 189 * One of the values defined below are returned. 190 */ 191 192 /* Connected; signals can be enqueued and sent. */ 193 #define ERTS_DSIG_PREP_CONNECTED 0 194 /* Not connected; connection needs to be set up. */ 195 #define ERTS_DSIG_PREP_NOT_CONNECTED 1 196 /* Caller would be suspended on send operation. */ 197 #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 198 /* System not alive (distributed) */ 199 #define ERTS_DSIG_PREP_NOT_ALIVE 3 200 /* Pending connection; signals can be enqueued */ 201 #define ERTS_DSIG_PREP_PENDING 4 202 203 /* dist_ctrl_{g,s}et_option/2 */ 204 #define ERTS_DIST_CTRL_OPT_GET_SIZE ((Uint32) (1 << 0)) 205 206 /* for emulator internal testing... */ 207 extern int erts_dflags_test_remove_hopefull_flags; 208 209 #ifdef DEBUG 210 #define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) \ 211 erts_dbg_chk_no_dist_proc_link((D), (R), (L)) 212 #else 213 #define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) 214 #endif 215 216 /* Define for testing */ 217 /* #define EXTREME_TTB_TRAPPING 1 */ 218 219 #ifndef EXTREME_TTB_TRAPPING 220 #define TERM_TO_BINARY_LOOP_FACTOR 32 221 #else 222 #define TERM_TO_BINARY_LOOP_FACTOR 1 223 #endif 224 225 typedef enum { TTBSize, TTBEncode, TTBCompress } TTBState; 226 typedef struct TTBSizeContext_ { 227 Uint64 dflags; 228 int level; 229 Sint vlen; 230 int iovec; 231 Uint fragment_size; 232 Uint last_result; 233 Uint extra_size; 234 Uint result; 235 Eterm obj; 236 ErtsWStack wstack; 237 } TTBSizeContext; 238 239 #define ERTS_INIT_TTBSizeContext(Ctx, Flags) \ 240 do { \ 241 (Ctx)->wstack.wstart = NULL; \ 242 (Ctx)->dflags = (Flags); \ 243 (Ctx)->level = 0; \ 244 (Ctx)->vlen = -1; \ 245 (Ctx)->fragment_size = ~((Uint) 0); \ 246 (Ctx)->extra_size = 0; \ 247 (Ctx)->last_result = 0; \ 248 } while (0) 249 250 typedef struct TTBEncodeContext_ { 251 Uint64 dflags; 252 Uint64 hopefull_flags; 253 byte *hopefull_flagsp; 254 int level; 255 byte* ep; 256 Eterm obj; 257 ErtsWStack wstack; 258 Binary *result_bin; 259 byte *cptr; 260 Sint vlen; 261 Uint size; 262 byte *payload_ixp; 263 byte *hopefull_ixp; 264 SysIOVec* iov; 265 ErlDrvBinary** binv; 266 Eterm *termv; 267 int iovec; 268 Uint fragment_size; 269 Sint frag_ix; 270 ErlIOVec *fragment_eiovs; 271 #ifdef DEBUG 272 int debug_fragments; 273 int debug_vlen; 274 #endif 275 } TTBEncodeContext; 276 277 #define ERTS_INIT_TTBEncodeContext(Ctx, Flags) \ 278 do { \ 279 (Ctx)->wstack.wstart = NULL; \ 280 (Ctx)->dflags = (Flags); \ 281 (Ctx)->level = 0; \ 282 (Ctx)->vlen = 0; \ 283 (Ctx)->size = 0; \ 284 (Ctx)->termv = NULL; \ 285 (Ctx)->iov = NULL; \ 286 (Ctx)->binv = NULL; \ 287 (Ctx)->fragment_size = ~((Uint) 0); \ 288 if ((Flags) & DFLAG_PENDING_CONNECT) { \ 289 (Ctx)->hopefull_flags = 0; \ 290 (Ctx)->hopefull_flagsp = NULL; \ 291 (Ctx)->hopefull_ixp = NULL; \ 292 (Ctx)->payload_ixp = NULL; \ 293 } \ 294 } while (0) 295 296 typedef struct { 297 Uint real_size; 298 Uint dest_len; 299 byte *dbytes; 300 Binary *result_bin; 301 Binary *destination_bin; 302 z_stream stream; 303 } TTBCompressContext; 304 305 typedef struct { 306 int alive; 307 TTBState state; 308 union { 309 TTBSizeContext sc; 310 TTBEncodeContext ec; 311 TTBCompressContext cc; 312 } s; 313 } TTBContext; 314 315 enum erts_dsig_send_phase { 316 ERTS_DSIG_SEND_PHASE_INIT, 317 ERTS_DSIG_SEND_PHASE_MSG_SIZE, 318 ERTS_DSIG_SEND_PHASE_ALLOC, 319 ERTS_DSIG_SEND_PHASE_MSG_ENCODE, 320 ERTS_DSIG_SEND_PHASE_FIN, 321 ERTS_DSIG_SEND_PHASE_SEND 322 }; 323 324 typedef struct erts_dsig_send_context { 325 int connect; 326 int no_suspend; 327 int no_trap; 328 329 Eterm ctl; 330 Eterm msg; 331 Eterm from; 332 Eterm ctl_heap[8]; /* 7-tuple (SPAWN_REQUEST_TT) */ 333 Eterm return_term; 334 335 DistEntry *dep; 336 Eterm node; /* used if dep == NULL */ 337 Eterm cid; 338 Eterm connection_id; 339 int deref_dep; 340 341 enum erts_dsig_send_phase phase; 342 Sint reds; 343 344 Uint data_size, dhdr_ext_size; 345 byte *dhdrp, *extp; 346 ErtsAtomCacheMap *acmp; 347 ErtsDistOutputBuf *obuf; 348 Uint alloced_fragments, fragments; 349 Sint vlen; 350 Uint64 dflags; 351 Process *c_p; 352 union { 353 TTBSizeContext sc; 354 TTBEncodeContext ec; 355 }u; 356 357 } ErtsDSigSendContext; 358 359 typedef struct dist_sequences DistSeqNode; 360 361 struct dist_sequences { 362 ErlHeapFragment hfrag; 363 struct dist_sequences *parent; 364 struct dist_sequences *left; 365 struct dist_sequences *right; 366 char is_red; 367 368 Uint64 seq_id; 369 int cnt; 370 Sint ctl_len; 371 }; 372 373 /* 374 * erts_dsig_send_* return values. 375 */ 376 #define ERTS_DSIG_SEND_OK 0 377 #define ERTS_DSIG_SEND_YIELD 1 378 #define ERTS_DSIG_SEND_CONTINUE 2 379 #define ERTS_DSIG_SEND_TOO_LRG 3 380 381 extern int erts_dsig_send_msg(ErtsDSigSendContext*, Eterm, Eterm); 382 extern int erts_dsig_send_reg_msg(ErtsDSigSendContext*, Eterm, Eterm, Eterm); 383 extern int erts_dsig_send_link(ErtsDSigSendContext *, Eterm, Eterm); 384 extern int erts_dsig_send_exit_tt(ErtsDSigSendContext *, Process *, Eterm, Eterm, Eterm); 385 extern int erts_dsig_send_unlink(ErtsDSigSendContext *, Eterm, Eterm, Uint64); 386 extern int erts_dsig_send_unlink_ack(ErtsDSigSendContext *, Eterm, Eterm, Uint64); 387 extern int erts_dsig_send_group_leader(ErtsDSigSendContext *, Eterm, Eterm); 388 extern int erts_dsig_send_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm); 389 extern int erts_dsig_send_exit2(ErtsDSigSendContext *, Eterm, Eterm, Eterm); 390 extern int erts_dsig_send_demonitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm); 391 extern int erts_dsig_send_monitor(ErtsDSigSendContext *, Eterm, Eterm, Eterm); 392 extern int erts_dsig_send_m_exit(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm); 393 extern int erts_dsig_send_spawn_reply(ErtsDSigSendContext *, Eterm, Eterm, Eterm, Eterm, Eterm); 394 395 extern int erts_dsig_send(ErtsDSigSendContext *dsdp); 396 extern int erts_dsend_context_dtor(Binary*); 397 extern Eterm erts_dsend_export_trap_context(Process* p, ErtsDSigSendContext* ctx); 398 399 extern int erts_dist_command(Port *prt, int reds); 400 extern void erts_dist_port_not_busy(Port *prt); 401 extern void erts_kill_dist_connection(DistEntry *dep, Uint32); 402 403 extern Uint erts_dist_cache_size(void); 404 405 extern Sint erts_abort_pending_connection_rwunlock(DistEntry *dep, int *); 406 407 extern void erts_debug_dist_seq_tree_foreach( 408 DistEntry *dep, 409 int (*func)(DistSeqNode *, void*, Sint), void *args); 410 411 extern int erts_dsig_prepare(ErtsDSigSendContext *, 412 DistEntry*, 413 Process *, 414 ErtsProcLocks, 415 ErtsDSigPrepLock, 416 int, 417 int, 418 int); 419 420 void erts_dist_print_procs_suspended_on_de(fmtfn_t to, void *to_arg); 421 int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); 422 423 Uint erts_ttb_iov_size(int use_termv, Sint vlen, Uint fragments); 424 void erts_ttb_iov_init(TTBEncodeContext *ctx, int use_termv, char *ptr, 425 Sint vlen, Uint fragments, Uint fragments_size); 426 #endif 427