1 /******************************************************************************* 2 * thrill/net/connection.hpp 3 * 4 * Contains net::Connection, a richer set of network point-to-point primitives. 5 * 6 * Part of Project Thrill - http://project-thrill.org 7 * 8 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 9 * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com> 10 * 11 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 12 ******************************************************************************/ 13 14 #pragma once 15 #ifndef THRILL_NET_CONNECTION_HEADER 16 #define THRILL_NET_CONNECTION_HEADER 17 18 #include <thrill/common/config.hpp> 19 #include <thrill/common/logger.hpp> 20 #include <thrill/common/porting.hpp> 21 #include <thrill/data/serialization.hpp> 22 #include <thrill/net/buffer_builder.hpp> 23 #include <thrill/net/buffer_reader.hpp> 24 #include <thrill/net/exception.hpp> 25 #include <thrill/net/fixed_buffer_builder.hpp> 26 27 #include <array> 28 #include <cassert> 29 #include <cerrno> 30 #include <cstdio> 31 #include <cstring> 32 #include <stdexcept> 33 #include <string> 34 35 namespace thrill { 36 namespace net { 37 38 //! \addtogroup net_layer 39 //! \{ 40 41 /*! 42 * A Connection represents a link to another peer in a network group. The link 43 * need not be an actual stateful TCP connection, but may be reliable and 44 * stateless. 45 * 46 * The Connection class is abstract, and subclasses must exist for every network 47 * implementation. 48 */ 49 class Connection 50 { 51 public: 52 //! flag which enables transmission of verification bytes for debugging, 53 //! this increases network volume. 54 static constexpr bool self_verify_ = common::g_self_verify; 55 56 //! typeid().hash_code() is only guaranteed to be equal for the same program 57 //! run, hence, we can only use it on loopback networks. 58 bool is_loopback_ = false; 59 60 //! Additional flags for sending or receiving. 61 enum Flags : size_t { 62 NoFlags = 0, 63 //! indicate that more data is coming, hence, sending a packet may be 64 //! delayed. currently only applies to TCP. 65 MsgMore = 1 66 }; 67 68 //! operator to combine flags operator |(const Flags & a,const Flags & b)69 friend inline Flags operator | (const Flags& a, const Flags& b) { 70 return static_cast<Flags>( 71 static_cast<size_t>(a) | static_cast<size_t>(b)); 72 } 73 74 //! \name Base Status Functions 75 //! \{ 76 77 //! check whether the connection is (still) valid. 78 virtual bool IsValid() const = 0; 79 80 //! return a string representation of this connection, for user output. 81 virtual std::string ToString() const = 0; 82 83 //! virtual method to output to a std::ostream 84 virtual std::ostream& OutputOstream(std::ostream& os) const = 0; 85 86 //! \} 87 88 //! \name Send Functions 89 //! \{ 90 91 //! Synchronous blocking send of the (data,size) packet. if sending fails, a 92 //! net::Exception is thrown. 93 virtual void SyncSend(const void* data, size_t size, 94 Flags flags = NoFlags) = 0; 95 96 //! Non-blocking send of a (data,size) message. returns number of bytes 97 //! possible to send. check errno for errors. 98 virtual ssize_t SendOne(const void* data, size_t size, 99 Flags flags = NoFlags) = 0; 100 101 //! Send any serializable POD item T. if sending fails, a net::Exception is 102 //! thrown. 103 template <typename T> 104 typename std::enable_if<std::is_pod<T>::value, void>::type Send(const T & value)105 Send(const T& value) { 106 if (self_verify_ && is_loopback_) { 107 // for communication verification, send hash_code. 108 size_t hash_code = typeid(T).hash_code(); 109 SyncSend(&hash_code, sizeof(hash_code)); 110 } 111 // send PODs directly from memory. 112 SyncSend(&value, sizeof(value)); 113 } 114 115 //! Send any serializable non-POD fixed-length item T. if sending fails, a 116 //! net::Exception is thrown. 117 template <typename T> 118 typename std::enable_if< 119 !std::is_pod<T>::value && 120 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type Send(const T & value)121 Send(const T& value) { 122 if (self_verify_ && is_loopback_) { 123 // for communication verification, send hash_code. 124 size_t hash_code = typeid(T).hash_code(); 125 SyncSend(&hash_code, sizeof(hash_code)); 126 } 127 // fixed_size items can be sent without size header 128 static constexpr size_t fixed_size 129 = data::Serialization<BufferBuilder, T>::fixed_size; 130 if (fixed_size < 2 * 1024 * 1024) { 131 // allocate buffer on stack (no allocation) 132 using FixedBuilder = FixedBufferBuilder<fixed_size>; 133 FixedBuilder fb; 134 data::Serialization<FixedBuilder, T>::Serialize(value, fb); 135 assert(fb.size() == fixed_size); 136 SyncSend(fb.data(), fb.size()); 137 } 138 else { 139 // too big, use heap allocation 140 BufferBuilder bb; 141 data::Serialization<BufferBuilder, T>::Serialize(value, bb); 142 SyncSend(bb.data(), bb.size()); 143 } 144 } 145 146 //! Send any serializable non-POD variable-length item T. if sending fails, 147 //! a net::Exception is thrown. 148 template <typename T> 149 typename std::enable_if< 150 !std::is_pod<T>::value && 151 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type Send(const T & value)152 Send(const T& value) { 153 if (self_verify_ && is_loopback_) { 154 // for communication verification, send hash_code. 155 size_t hash_code = typeid(T).hash_code(); 156 SyncSend(&hash_code, sizeof(hash_code)); 157 } 158 // variable length items must be prefixed with size header 159 BufferBuilder bb; 160 data::Serialization<BufferBuilder, T>::Serialize(value, bb); 161 size_t size = bb.size(); 162 SyncSend(&size, sizeof(size), MsgMore); 163 SyncSend(bb.data(), bb.size()); 164 } 165 166 //! \} 167 168 //! \name Receive Functions 169 //! \{ 170 171 //! Synchronous blocking receive a message of given size. The size must 172 //! match the SyncSend size for some network layers may only support 173 //! matching messages (read: RDMA!, but also true for the mock net). Throws 174 //! a net::Exception on errors. 175 virtual void SyncRecv(void* out_data, size_t size) = 0; 176 177 //! Non-blocking receive of at most size data. returns number of bytes 178 //! actually received. check errno for errors. 179 virtual ssize_t RecvOne(void* out_data, size_t size) = 0; 180 181 //! Receive any serializable POD item T. 182 template <typename T> 183 typename std::enable_if<std::is_pod<T>::value, void>::type Receive(T * out_value)184 Receive(T* out_value) { 185 if (self_verify_ && is_loopback_) { 186 // for communication verification, receive hash_code. 187 size_t hash_code; 188 SyncRecv(&hash_code, sizeof(hash_code)); 189 if (hash_code != typeid(T).hash_code()) { 190 throw std::runtime_error( 191 "Connection::Receive() attempted to receive item " 192 "with different typeid!"); 193 } 194 } 195 // receive PODs directly into memory. 196 SyncRecv(out_value, sizeof(*out_value)); 197 } 198 199 //! Receive any serializable non-POD fixed-length item T. 200 template <typename T> 201 typename std::enable_if< 202 !std::is_pod<T>::value && 203 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type Receive(T * out_value)204 Receive(T* out_value) { 205 if (self_verify_ && is_loopback_) { 206 // for communication verification, receive hash_code. 207 size_t hash_code; 208 SyncRecv(&hash_code, sizeof(hash_code)); 209 if (hash_code != typeid(T).hash_code()) { 210 throw std::runtime_error( 211 "Connection::Receive() attempted to receive item " 212 "with different typeid!"); 213 } 214 } 215 // fixed_size items can be received without size header 216 static constexpr size_t fixed_size 217 = data::Serialization<BufferBuilder, T>::fixed_size; 218 if (fixed_size < 2 * 1024 * 1024) { 219 // allocate buffer on stack (no allocation) 220 std::array<uint8_t, fixed_size> b; 221 SyncRecv(b.data(), b.size()); 222 BufferReader br(b.data(), b.size()); 223 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 224 } 225 else { 226 // too big, use heap allocation 227 Buffer b(data::Serialization<BufferBuilder, T>::fixed_size); 228 SyncRecv(b.data(), b.size()); 229 BufferReader br(b); 230 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 231 } 232 } 233 234 //! Receive any serializable non-POD fixed-length item T. 235 template <typename T> 236 typename std::enable_if< 237 !std::is_pod<T>::value && 238 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type Receive(T * out_value)239 Receive(T* out_value) { 240 if (self_verify_ && is_loopback_) { 241 // for communication verification, receive hash_code. 242 size_t hash_code; 243 SyncRecv(&hash_code, sizeof(hash_code)); 244 if (hash_code != typeid(T).hash_code()) { 245 throw std::runtime_error( 246 "Connection::Receive() attempted to receive item " 247 "with different typeid!"); 248 } 249 } 250 // variable length items are prefixed with size header 251 size_t size; 252 SyncRecv(&size, sizeof(size)); 253 // receives message 254 Buffer b(size); 255 SyncRecv(b.data(), size); 256 BufferReader br(b); 257 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 258 } 259 260 //! \} 261 262 //! \name Paired SendReceive Methods 263 //! \{ 264 265 //! Synchronous blocking sending and receive a message of given size. The 266 //! size must match the SyncSendRecv size for some network layers may only 267 //! support matching messages (read: RDMA!, but also true for the mock 268 //! net). Throws a net::Exception on errors. 269 virtual void SyncSendRecv(const void* send_data, size_t send_size, 270 void* recv_data, size_t recv_size) = 0; 271 virtual void SyncRecvSend(const void* send_data, size_t send_size, 272 void* recv_data, size_t recv_size) = 0; 273 274 //! SendReceive any serializable POD item T. 275 template <typename T> 276 typename std::enable_if<std::is_pod<T>::value, void>::type SendReceive(const T * value,T * out_value,size_t n=1)277 SendReceive(const T* value, T* out_value, size_t n = 1) { 278 if (self_verify_ && is_loopback_) { 279 // for communication verification, send/receive hash_code. 280 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 281 SyncSendRecv(&send_hash_code, sizeof(send_hash_code), 282 &recv_hash_code, sizeof(recv_hash_code)); 283 if (recv_hash_code != typeid(T).hash_code()) { 284 throw std::runtime_error( 285 "Connection::SendReceive() attempted to receive item " 286 "with different typeid!"); 287 } 288 } 289 290 // receive PODs directly into memory. 291 SyncSendRecv(value, n * sizeof(T), out_value, n * sizeof(T)); 292 } 293 294 template <typename T> 295 typename std::enable_if<std::is_pod<T>::value, void>::type ReceiveSend(const T & value,T * out_value)296 ReceiveSend(const T& value, T* out_value) { 297 if (self_verify_ && is_loopback_) { 298 // for communication verification, send/receive hash_code. 299 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 300 SyncRecvSend(&send_hash_code, sizeof(send_hash_code), 301 &recv_hash_code, sizeof(recv_hash_code)); 302 if (recv_hash_code != typeid(T).hash_code()) { 303 throw std::runtime_error( 304 "Connection::SendReceive() attempted to receive item " 305 "with different typeid!"); 306 } 307 } 308 // receive PODs directly into memory. 309 SyncRecvSend(&value, sizeof(value), out_value, sizeof(*out_value)); 310 } 311 312 //! SendReceive any serializable non-POD fixed-length item T. 313 template <typename T> 314 typename std::enable_if< 315 !std::is_pod<T>::value && 316 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type SendReceive(const T * value,T * out_value,size_t n=1)317 SendReceive(const T* value, T* out_value, size_t n = 1) { 318 if (self_verify_ && is_loopback_) { 319 // for communication verification, send/receive hash_code. 320 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 321 SyncSendRecv(&send_hash_code, sizeof(send_hash_code), 322 &recv_hash_code, sizeof(recv_hash_code)); 323 if (recv_hash_code != typeid(T).hash_code()) { 324 throw std::runtime_error( 325 "Connection::SendReceive() attempted to receive item " 326 "with different typeid!"); 327 } 328 } 329 330 // fixed_size items can be sent/recv without size header 331 BufferBuilder sendb(n * data::Serialization<BufferBuilder, T>::fixed_size); 332 for (size_t i = 0; i < n; ++i) { 333 data::Serialization<BufferBuilder, T>::Serialize(value[i], sendb); 334 } 335 Buffer recvb(n * data::Serialization<BufferBuilder, T>::fixed_size); 336 SyncSendRecv(sendb.data(), sendb.size(), 337 recvb.data(), recvb.size()); 338 BufferReader br(recvb); 339 for (size_t i = 0; i < n; ++i) { 340 out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br); 341 } 342 } 343 344 template <typename T> 345 typename std::enable_if< 346 !std::is_pod<T>::value && 347 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type ReceiveSend(const T & value,T * out_value)348 ReceiveSend(const T& value, T* out_value) { 349 if (self_verify_ && is_loopback_) { 350 // for communication verification, send/receive hash_code. 351 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 352 SyncRecvSend(&send_hash_code, sizeof(send_hash_code), 353 &recv_hash_code, sizeof(recv_hash_code)); 354 if (recv_hash_code != typeid(T).hash_code()) { 355 throw std::runtime_error( 356 "Connection::SendReceive() attempted to receive item " 357 "with different typeid!"); 358 } 359 } 360 361 // fixed_size items can be sent/recv without size header 362 static constexpr size_t fixed_size 363 = data::Serialization<BufferBuilder, T>::fixed_size; 364 if (fixed_size < 2 * 1024 * 1024) { 365 // allocate buffer on stack (no allocation) 366 using FixedBuilder = FixedBufferBuilder<fixed_size>; 367 FixedBuilder sendb; 368 data::Serialization<FixedBuilder, T>::Serialize(value, sendb); 369 assert(sendb.size() == fixed_size); 370 std::array<uint8_t, fixed_size> recvb; 371 SyncRecvSend(sendb.data(), sendb.size(), 372 recvb.data(), recvb.size()); 373 BufferReader br(recvb.data(), recvb.size()); 374 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 375 } 376 else { 377 // too big, use heap allocation 378 BufferBuilder sendb; 379 data::Serialization<BufferBuilder, T>::Serialize(value, sendb); 380 Buffer recvb(data::Serialization<BufferBuilder, T>::fixed_size); 381 SyncRecvSend(sendb.data(), sendb.size(), 382 recvb.data(), recvb.size()); 383 BufferReader br(recvb); 384 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 385 } 386 } 387 388 //! SendReceive any serializable non-POD fixed-length item T. 389 template <typename T> 390 typename std::enable_if< 391 !std::is_pod<T>::value && 392 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type SendReceive(const T * value,T * out_value,size_t n=1)393 SendReceive(const T* value, T* out_value, size_t n = 1) { 394 if (self_verify_ && is_loopback_) { 395 // for communication verification, send/receive hash_code. 396 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 397 SyncSendRecv(&send_hash_code, sizeof(send_hash_code), 398 &recv_hash_code, sizeof(recv_hash_code)); 399 if (recv_hash_code != typeid(T).hash_code()) { 400 throw std::runtime_error( 401 "Connection::SendReceive() attempted to receive item " 402 "with different typeid!"); 403 } 404 } 405 // variable length items must be prefixed with size header 406 BufferBuilder sendb; 407 for (size_t i = 0; i < n; ++i) { 408 data::Serialization<BufferBuilder, T>::Serialize(value[i], sendb); 409 } 410 size_t send_size = sendb.size(), recv_size; 411 SyncSendRecv(&send_size, sizeof(send_size), 412 &recv_size, sizeof(recv_size)); 413 // receives message 414 Buffer recvb(recv_size); 415 SyncSendRecv(sendb.data(), sendb.size(), 416 recvb.data(), recv_size); 417 BufferReader br(recvb); 418 for (size_t i = 0; i < n; ++i) { 419 out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br); 420 } 421 } 422 423 template <typename T> 424 typename std::enable_if< 425 !std::is_pod<T>::value && 426 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type ReceiveSend(const T & value,T * out_value)427 ReceiveSend(const T& value, T* out_value) { 428 if (self_verify_ && is_loopback_) { 429 // for communication verification, send/receive hash_code. 430 size_t send_hash_code = typeid(T).hash_code(), recv_hash_code; 431 SyncRecvSend(&send_hash_code, sizeof(send_hash_code), 432 &recv_hash_code, sizeof(recv_hash_code)); 433 if (recv_hash_code != typeid(T).hash_code()) { 434 throw std::runtime_error( 435 "Connection::SendReceive() attempted to receive item " 436 "with different typeid!"); 437 } 438 } 439 // variable length items must be prefixed with size header 440 BufferBuilder sendb; 441 data::Serialization<BufferBuilder, T>::Serialize(value, sendb); 442 size_t send_size = sendb.size(), recv_size; 443 SyncRecvSend(&send_size, sizeof(send_size), 444 &recv_size, sizeof(recv_size)); 445 // receives message 446 Buffer recvb(recv_size); 447 SyncRecvSend(sendb.data(), sendb.size(), 448 recvb.data(), recv_size); 449 BufferReader br(recvb); 450 *out_value = data::Serialization<BufferReader, T>::Deserialize(br); 451 } 452 453 //! \} 454 455 //! \name SendN Functions 456 //! \{ 457 458 //! Send an array of serializable POD items T. if sending fails, a net::Exception is 459 //! thrown. 460 template <typename T> 461 typename std::enable_if<std::is_pod<T>::value, void>::type SendN(const T * value,size_t n)462 SendN(const T* value, size_t n) { 463 if (self_verify_ && is_loopback_) { 464 // for communication verification, send hash_code. 465 size_t hash_code = typeid(T).hash_code(); 466 SyncSend(&hash_code, sizeof(hash_code)); 467 } 468 // send PODs directly from memory. 469 SyncSend(value, n * sizeof(T)); 470 } 471 472 //! Send an array of serializable non-POD fixed-length items T. if sending fails, a 473 //! net::Exception is thrown. 474 template <typename T> 475 typename std::enable_if< 476 !std::is_pod<T>::value && 477 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type SendN(const T * value,size_t n)478 SendN(const T* value, size_t n) { 479 if (self_verify_ && is_loopback_) { 480 // for communication verification, send hash_code. 481 size_t hash_code = typeid(T).hash_code(); 482 SyncSend(&hash_code, sizeof(hash_code)); 483 } 484 // fixed_size items can be sent without size header 485 static constexpr size_t fixed_size 486 = data::Serialization<BufferBuilder, T>::fixed_size; 487 BufferBuilder bb(n * fixed_size); 488 for (size_t i = 0; i < n; ++i) { 489 data::Serialization<BufferBuilder, T>::Serialize(value[i], bb); 490 } 491 SyncSend(bb.data(), bb.size()); 492 } 493 494 //! Send an array of serializable non-POD variable-length items T. if sending fails, 495 //! a net::Exception is thrown. 496 template <typename T> 497 typename std::enable_if< 498 !std::is_pod<T>::value && 499 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type SendN(const T * value,size_t n)500 SendN(const T* value, size_t n) { 501 if (self_verify_ && is_loopback_) { 502 // for communication verification, send hash_code. 503 size_t hash_code = typeid(T).hash_code(); 504 SyncSend(&hash_code, sizeof(hash_code)); 505 } 506 // variable length items must be prefixed with size header 507 BufferBuilder bb; 508 for (size_t i = 0; i < n; ++i) { 509 data::Serialization<BufferBuilder, T>::Serialize(value[i], bb); 510 } 511 size_t size = bb.size(); 512 SyncSend(&size, sizeof(size), MsgMore); 513 SyncSend(bb.data(), bb.size()); 514 } 515 516 //! \} 517 518 //! \name ReceiveN Functions 519 //! \{ 520 521 //! Receive an array of serializable POD items T. 522 template <typename T> 523 typename std::enable_if<std::is_pod<T>::value, void>::type ReceiveN(T * out_value,size_t n)524 ReceiveN(T* out_value, size_t n) { 525 if (self_verify_ && is_loopback_) { 526 // for communication verification, receive hash_code. 527 size_t hash_code; 528 SyncRecv(&hash_code, sizeof(hash_code)); 529 if (hash_code != typeid(T).hash_code()) { 530 throw std::runtime_error( 531 "Connection::ReceiveN() attempted to receive item " 532 "with different typeid!"); 533 } 534 } 535 // receive PODs directly into memory. 536 SyncRecv(out_value, n * sizeof(T)); 537 } 538 539 //! Receive an array of serializable non-POD fixed-length items T. 540 template <typename T> 541 typename std::enable_if< 542 !std::is_pod<T>::value && 543 data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type ReceiveN(T * out_value,size_t n)544 ReceiveN(T* out_value, size_t n) { 545 if (self_verify_ && is_loopback_) { 546 // for communication verification, receive hash_code. 547 size_t hash_code; 548 SyncRecv(&hash_code, sizeof(hash_code)); 549 if (hash_code != typeid(T).hash_code()) { 550 throw std::runtime_error( 551 "Connection::ReceiveN() attempted to receive item " 552 "with different typeid!"); 553 } 554 } 555 // fixed_size items can be received without size header 556 Buffer b(n * data::Serialization<BufferBuilder, T>::fixed_size); 557 SyncRecv(b.data(), b.size()); 558 BufferReader br(b); 559 for (size_t i = 0; i < n; ++i) { 560 out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br); 561 } 562 } 563 564 //! Receive an array of serializable non-POD fixed-length items T. 565 template <typename T> 566 typename std::enable_if< 567 !std::is_pod<T>::value && 568 !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type ReceiveN(T * out_value,size_t n)569 ReceiveN(T* out_value, size_t n) { 570 if (self_verify_ && is_loopback_) { 571 // for communication verification, receive hash_code. 572 size_t hash_code; 573 SyncRecv(&hash_code, sizeof(hash_code)); 574 if (hash_code != typeid(T).hash_code()) { 575 throw std::runtime_error( 576 "Connection::ReceiveN() attempted to receive item " 577 "with different typeid!"); 578 } 579 } 580 // variable length items are prefixed with size header 581 size_t size; 582 SyncRecv(&size, sizeof(size)); 583 // receives message 584 Buffer b(size); 585 SyncRecv(b.data(), size); 586 BufferReader br(b); 587 for (size_t i = 0; i < n; ++i) { 588 out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br); 589 } 590 } 591 592 //! \} 593 594 //! \name Sequence Numbers 595 //! \{ 596 597 //! send sequence 598 std::atomic<uint32_t> tx_seq_ { 0 }; 599 600 //! receive sequence 601 std::atomic<uint32_t> rx_seq_ { 0 }; 602 603 //! \} 604 605 //! \name Statistics 606 //! { 607 608 //! sent bytes 609 std::atomic<size_t> tx_bytes_ { 0 }; 610 611 //! received bytes 612 std::atomic<size_t> rx_bytes_ = { 0 }; 613 614 //! previous read of sent bytes 615 size_t prev_tx_bytes_ = 0; 616 617 //! previous read of received bytes 618 size_t prev_rx_bytes_ = 0; 619 620 //! active send requests 621 std::atomic<size_t> tx_active_ { 0 }; 622 623 //! active recv requests 624 std::atomic<size_t> rx_active_ = { 0 }; 625 626 //! } 627 628 //! make ostreamable operator <<(std::ostream & os,const Connection & c)629 friend std::ostream& operator << (std::ostream& os, const Connection& c) { 630 return c.OutputOstream(os); 631 } 632 }; 633 634 // \} 635 636 } // namespace net 637 } // namespace thrill 638 639 #endif // !THRILL_NET_CONNECTION_HEADER 640 641 /******************************************************************************/ 642