1 /*******************************************************************************
2  * thrill/net/connection.hpp
3  *
4  * Contains net::Connection, a richer set of network point-to-point primitives.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
9  * Copyright (C) 2015 Emanuel Jöbstl <emanuel.joebstl@gmail.com>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_CONNECTION_HEADER
16 #define THRILL_NET_CONNECTION_HEADER
17 
18 #include <thrill/common/config.hpp>
19 #include <thrill/common/logger.hpp>
20 #include <thrill/common/porting.hpp>
21 #include <thrill/data/serialization.hpp>
22 #include <thrill/net/buffer_builder.hpp>
23 #include <thrill/net/buffer_reader.hpp>
24 #include <thrill/net/exception.hpp>
25 #include <thrill/net/fixed_buffer_builder.hpp>
26 
27 #include <array>
28 #include <cassert>
29 #include <cerrno>
30 #include <cstdio>
31 #include <cstring>
32 #include <stdexcept>
33 #include <string>
34 
35 namespace thrill {
36 namespace net {
37 
38 //! \addtogroup net_layer
39 //! \{
40 
41 /*!
42  * A Connection represents a link to another peer in a network group. The link
43  * need not be an actual stateful TCP connection, but may be reliable and
44  * stateless.
45  *
46  * The Connection class is abstract, and subclasses must exist for every network
47  * implementation.
48  */
49 class Connection
50 {
51 public:
52     //! flag which enables transmission of verification bytes for debugging,
53     //! this increases network volume.
54     static constexpr bool self_verify_ = common::g_self_verify;
55 
56     //! typeid().hash_code() is only guaranteed to be equal for the same program
57     //! run, hence, we can only use it on loopback networks.
58     bool is_loopback_ = false;
59 
60     //! Additional flags for sending or receiving.
61     enum Flags : size_t {
62         NoFlags = 0,
63         //! indicate that more data is coming, hence, sending a packet may be
64         //! delayed. currently only applies to TCP.
65         MsgMore = 1
66     };
67 
68     //! operator to combine flags
operator |(const Flags & a,const Flags & b)69     friend inline Flags operator | (const Flags& a, const Flags& b) {
70         return static_cast<Flags>(
71             static_cast<size_t>(a) | static_cast<size_t>(b));
72     }
73 
74     //! \name Base Status Functions
75     //! \{
76 
77     //! check whether the connection is (still) valid.
78     virtual bool IsValid() const = 0;
79 
80     //! return a string representation of this connection, for user output.
81     virtual std::string ToString() const = 0;
82 
83     //! virtual method to output to a std::ostream
84     virtual std::ostream& OutputOstream(std::ostream& os) const = 0;
85 
86     //! \}
87 
88     //! \name Send Functions
89     //! \{
90 
91     //! Synchronous blocking send of the (data,size) packet. if sending fails, a
92     //! net::Exception is thrown.
93     virtual void SyncSend(const void* data, size_t size,
94                           Flags flags = NoFlags) = 0;
95 
96     //! Non-blocking send of a (data,size) message. returns number of bytes
97     //! possible to send. check errno for errors.
98     virtual ssize_t SendOne(const void* data, size_t size,
99                             Flags flags = NoFlags) = 0;
100 
101     //! Send any serializable POD item T. if sending fails, a net::Exception is
102     //! thrown.
103     template <typename T>
104     typename std::enable_if<std::is_pod<T>::value, void>::type
Send(const T & value)105     Send(const T& value) {
106         if (self_verify_ && is_loopback_) {
107             // for communication verification, send hash_code.
108             size_t hash_code = typeid(T).hash_code();
109             SyncSend(&hash_code, sizeof(hash_code));
110         }
111         // send PODs directly from memory.
112         SyncSend(&value, sizeof(value));
113     }
114 
115     //! Send any serializable non-POD fixed-length item T. if sending fails, a
116     //! net::Exception is thrown.
117     template <typename T>
118     typename std::enable_if<
119         !std::is_pod<T>::value &&
120         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
Send(const T & value)121     Send(const T& value) {
122         if (self_verify_ && is_loopback_) {
123             // for communication verification, send hash_code.
124             size_t hash_code = typeid(T).hash_code();
125             SyncSend(&hash_code, sizeof(hash_code));
126         }
127         // fixed_size items can be sent without size header
128         static constexpr size_t fixed_size
129             = data::Serialization<BufferBuilder, T>::fixed_size;
130         if (fixed_size < 2 * 1024 * 1024) {
131             // allocate buffer on stack (no allocation)
132             using FixedBuilder = FixedBufferBuilder<fixed_size>;
133             FixedBuilder fb;
134             data::Serialization<FixedBuilder, T>::Serialize(value, fb);
135             assert(fb.size() == fixed_size);
136             SyncSend(fb.data(), fb.size());
137         }
138         else {
139             // too big, use heap allocation
140             BufferBuilder bb;
141             data::Serialization<BufferBuilder, T>::Serialize(value, bb);
142             SyncSend(bb.data(), bb.size());
143         }
144     }
145 
146     //! Send any serializable non-POD variable-length item T. if sending fails,
147     //! a net::Exception is thrown.
148     template <typename T>
149     typename std::enable_if<
150         !std::is_pod<T>::value &&
151         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
Send(const T & value)152     Send(const T& value) {
153         if (self_verify_ && is_loopback_) {
154             // for communication verification, send hash_code.
155             size_t hash_code = typeid(T).hash_code();
156             SyncSend(&hash_code, sizeof(hash_code));
157         }
158         // variable length items must be prefixed with size header
159         BufferBuilder bb;
160         data::Serialization<BufferBuilder, T>::Serialize(value, bb);
161         size_t size = bb.size();
162         SyncSend(&size, sizeof(size), MsgMore);
163         SyncSend(bb.data(), bb.size());
164     }
165 
166     //! \}
167 
168     //! \name Receive Functions
169     //! \{
170 
171     //! Synchronous blocking receive a message of given size. The size must
172     //! match the SyncSend size for some network layers may only support
173     //! matching messages (read: RDMA!, but also true for the mock net). Throws
174     //! a net::Exception on errors.
175     virtual void SyncRecv(void* out_data, size_t size) = 0;
176 
177     //! Non-blocking receive of at most size data. returns number of bytes
178     //! actually received. check errno for errors.
179     virtual ssize_t RecvOne(void* out_data, size_t size) = 0;
180 
181     //! Receive any serializable POD item T.
182     template <typename T>
183     typename std::enable_if<std::is_pod<T>::value, void>::type
Receive(T * out_value)184     Receive(T* out_value) {
185         if (self_verify_ && is_loopback_) {
186             // for communication verification, receive hash_code.
187             size_t hash_code;
188             SyncRecv(&hash_code, sizeof(hash_code));
189             if (hash_code != typeid(T).hash_code()) {
190                 throw std::runtime_error(
191                           "Connection::Receive() attempted to receive item "
192                           "with different typeid!");
193             }
194         }
195         // receive PODs directly into memory.
196         SyncRecv(out_value, sizeof(*out_value));
197     }
198 
199     //! Receive any serializable non-POD fixed-length item T.
200     template <typename T>
201     typename std::enable_if<
202         !std::is_pod<T>::value &&
203         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
Receive(T * out_value)204     Receive(T* out_value) {
205         if (self_verify_ && is_loopback_) {
206             // for communication verification, receive hash_code.
207             size_t hash_code;
208             SyncRecv(&hash_code, sizeof(hash_code));
209             if (hash_code != typeid(T).hash_code()) {
210                 throw std::runtime_error(
211                           "Connection::Receive() attempted to receive item "
212                           "with different typeid!");
213             }
214         }
215         // fixed_size items can be received without size header
216         static constexpr size_t fixed_size
217             = data::Serialization<BufferBuilder, T>::fixed_size;
218         if (fixed_size < 2 * 1024 * 1024) {
219             // allocate buffer on stack (no allocation)
220             std::array<uint8_t, fixed_size> b;
221             SyncRecv(b.data(), b.size());
222             BufferReader br(b.data(), b.size());
223             *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
224         }
225         else {
226             // too big, use heap allocation
227             Buffer b(data::Serialization<BufferBuilder, T>::fixed_size);
228             SyncRecv(b.data(), b.size());
229             BufferReader br(b);
230             *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
231         }
232     }
233 
234     //! Receive any serializable non-POD fixed-length item T.
235     template <typename T>
236     typename std::enable_if<
237         !std::is_pod<T>::value &&
238         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
Receive(T * out_value)239     Receive(T* out_value) {
240         if (self_verify_ && is_loopback_) {
241             // for communication verification, receive hash_code.
242             size_t hash_code;
243             SyncRecv(&hash_code, sizeof(hash_code));
244             if (hash_code != typeid(T).hash_code()) {
245                 throw std::runtime_error(
246                           "Connection::Receive() attempted to receive item "
247                           "with different typeid!");
248             }
249         }
250         // variable length items are prefixed with size header
251         size_t size;
252         SyncRecv(&size, sizeof(size));
253         // receives message
254         Buffer b(size);
255         SyncRecv(b.data(), size);
256         BufferReader br(b);
257         *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
258     }
259 
260     //! \}
261 
262     //! \name Paired SendReceive Methods
263     //! \{
264 
265     //! Synchronous blocking sending and receive a message of given size. The
266     //! size must match the SyncSendRecv size for some network layers may only
267     //! support matching messages (read: RDMA!, but also true for the mock
268     //! net). Throws a net::Exception on errors.
269     virtual void SyncSendRecv(const void* send_data, size_t send_size,
270                               void* recv_data, size_t recv_size) = 0;
271     virtual void SyncRecvSend(const void* send_data, size_t send_size,
272                               void* recv_data, size_t recv_size) = 0;
273 
274     //! SendReceive any serializable POD item T.
275     template <typename T>
276     typename std::enable_if<std::is_pod<T>::value, void>::type
SendReceive(const T * value,T * out_value,size_t n=1)277     SendReceive(const T* value, T* out_value, size_t n = 1) {
278         if (self_verify_ && is_loopback_) {
279             // for communication verification, send/receive hash_code.
280             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
281             SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
282                          &recv_hash_code, sizeof(recv_hash_code));
283             if (recv_hash_code != typeid(T).hash_code()) {
284                 throw std::runtime_error(
285                           "Connection::SendReceive() attempted to receive item "
286                           "with different typeid!");
287             }
288         }
289 
290         // receive PODs directly into memory.
291         SyncSendRecv(value, n * sizeof(T), out_value, n * sizeof(T));
292     }
293 
294     template <typename T>
295     typename std::enable_if<std::is_pod<T>::value, void>::type
ReceiveSend(const T & value,T * out_value)296     ReceiveSend(const T& value, T* out_value) {
297         if (self_verify_ && is_loopback_) {
298             // for communication verification, send/receive hash_code.
299             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
300             SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
301                          &recv_hash_code, sizeof(recv_hash_code));
302             if (recv_hash_code != typeid(T).hash_code()) {
303                 throw std::runtime_error(
304                           "Connection::SendReceive() attempted to receive item "
305                           "with different typeid!");
306             }
307         }
308         // receive PODs directly into memory.
309         SyncRecvSend(&value, sizeof(value), out_value, sizeof(*out_value));
310     }
311 
312     //! SendReceive any serializable non-POD fixed-length item T.
313     template <typename T>
314     typename std::enable_if<
315         !std::is_pod<T>::value &&
316         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
SendReceive(const T * value,T * out_value,size_t n=1)317     SendReceive(const T* value, T* out_value, size_t n = 1) {
318         if (self_verify_ && is_loopback_) {
319             // for communication verification, send/receive hash_code.
320             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
321             SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
322                          &recv_hash_code, sizeof(recv_hash_code));
323             if (recv_hash_code != typeid(T).hash_code()) {
324                 throw std::runtime_error(
325                           "Connection::SendReceive() attempted to receive item "
326                           "with different typeid!");
327             }
328         }
329 
330         // fixed_size items can be sent/recv without size header
331         BufferBuilder sendb(n * data::Serialization<BufferBuilder, T>::fixed_size);
332         for (size_t i = 0; i < n; ++i) {
333             data::Serialization<BufferBuilder, T>::Serialize(value[i], sendb);
334         }
335         Buffer recvb(n * data::Serialization<BufferBuilder, T>::fixed_size);
336         SyncSendRecv(sendb.data(), sendb.size(),
337                      recvb.data(), recvb.size());
338         BufferReader br(recvb);
339         for (size_t i = 0; i < n; ++i) {
340             out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br);
341         }
342     }
343 
344     template <typename T>
345     typename std::enable_if<
346         !std::is_pod<T>::value &&
347         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
ReceiveSend(const T & value,T * out_value)348     ReceiveSend(const T& value, T* out_value) {
349         if (self_verify_ && is_loopback_) {
350             // for communication verification, send/receive hash_code.
351             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
352             SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
353                          &recv_hash_code, sizeof(recv_hash_code));
354             if (recv_hash_code != typeid(T).hash_code()) {
355                 throw std::runtime_error(
356                           "Connection::SendReceive() attempted to receive item "
357                           "with different typeid!");
358             }
359         }
360 
361         // fixed_size items can be sent/recv without size header
362         static constexpr size_t fixed_size
363             = data::Serialization<BufferBuilder, T>::fixed_size;
364         if (fixed_size < 2 * 1024 * 1024) {
365             // allocate buffer on stack (no allocation)
366             using FixedBuilder = FixedBufferBuilder<fixed_size>;
367             FixedBuilder sendb;
368             data::Serialization<FixedBuilder, T>::Serialize(value, sendb);
369             assert(sendb.size() == fixed_size);
370             std::array<uint8_t, fixed_size> recvb;
371             SyncRecvSend(sendb.data(), sendb.size(),
372                          recvb.data(), recvb.size());
373             BufferReader br(recvb.data(), recvb.size());
374             *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
375         }
376         else {
377             // too big, use heap allocation
378             BufferBuilder sendb;
379             data::Serialization<BufferBuilder, T>::Serialize(value, sendb);
380             Buffer recvb(data::Serialization<BufferBuilder, T>::fixed_size);
381             SyncRecvSend(sendb.data(), sendb.size(),
382                          recvb.data(), recvb.size());
383             BufferReader br(recvb);
384             *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
385         }
386     }
387 
388     //! SendReceive any serializable non-POD fixed-length item T.
389     template <typename T>
390     typename std::enable_if<
391         !std::is_pod<T>::value &&
392         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
SendReceive(const T * value,T * out_value,size_t n=1)393     SendReceive(const T* value, T* out_value, size_t n = 1) {
394         if (self_verify_ && is_loopback_) {
395             // for communication verification, send/receive hash_code.
396             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
397             SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
398                          &recv_hash_code, sizeof(recv_hash_code));
399             if (recv_hash_code != typeid(T).hash_code()) {
400                 throw std::runtime_error(
401                           "Connection::SendReceive() attempted to receive item "
402                           "with different typeid!");
403             }
404         }
405         // variable length items must be prefixed with size header
406         BufferBuilder sendb;
407         for (size_t i = 0; i < n; ++i) {
408             data::Serialization<BufferBuilder, T>::Serialize(value[i], sendb);
409         }
410         size_t send_size = sendb.size(), recv_size;
411         SyncSendRecv(&send_size, sizeof(send_size),
412                      &recv_size, sizeof(recv_size));
413         // receives message
414         Buffer recvb(recv_size);
415         SyncSendRecv(sendb.data(), sendb.size(),
416                      recvb.data(), recv_size);
417         BufferReader br(recvb);
418         for (size_t i = 0; i < n; ++i) {
419             out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br);
420         }
421     }
422 
423     template <typename T>
424     typename std::enable_if<
425         !std::is_pod<T>::value &&
426         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
ReceiveSend(const T & value,T * out_value)427     ReceiveSend(const T& value, T* out_value) {
428         if (self_verify_ && is_loopback_) {
429             // for communication verification, send/receive hash_code.
430             size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
431             SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
432                          &recv_hash_code, sizeof(recv_hash_code));
433             if (recv_hash_code != typeid(T).hash_code()) {
434                 throw std::runtime_error(
435                           "Connection::SendReceive() attempted to receive item "
436                           "with different typeid!");
437             }
438         }
439         // variable length items must be prefixed with size header
440         BufferBuilder sendb;
441         data::Serialization<BufferBuilder, T>::Serialize(value, sendb);
442         size_t send_size = sendb.size(), recv_size;
443         SyncRecvSend(&send_size, sizeof(send_size),
444                      &recv_size, sizeof(recv_size));
445         // receives message
446         Buffer recvb(recv_size);
447         SyncRecvSend(sendb.data(), sendb.size(),
448                      recvb.data(), recv_size);
449         BufferReader br(recvb);
450         *out_value = data::Serialization<BufferReader, T>::Deserialize(br);
451     }
452 
453     //! \}
454 
455     //! \name SendN Functions
456     //! \{
457 
458     //! Send an array of serializable POD items T. if sending fails, a net::Exception is
459     //! thrown.
460     template <typename T>
461     typename std::enable_if<std::is_pod<T>::value, void>::type
SendN(const T * value,size_t n)462     SendN(const T* value, size_t n) {
463         if (self_verify_ && is_loopback_) {
464             // for communication verification, send hash_code.
465             size_t hash_code = typeid(T).hash_code();
466             SyncSend(&hash_code, sizeof(hash_code));
467         }
468         // send PODs directly from memory.
469         SyncSend(value, n * sizeof(T));
470     }
471 
472     //! Send an array of serializable non-POD fixed-length items T. if sending fails, a
473     //! net::Exception is thrown.
474     template <typename T>
475     typename std::enable_if<
476         !std::is_pod<T>::value &&
477         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
SendN(const T * value,size_t n)478     SendN(const T* value, size_t n) {
479         if (self_verify_ && is_loopback_) {
480             // for communication verification, send hash_code.
481             size_t hash_code = typeid(T).hash_code();
482             SyncSend(&hash_code, sizeof(hash_code));
483         }
484         // fixed_size items can be sent without size header
485         static constexpr size_t fixed_size
486             = data::Serialization<BufferBuilder, T>::fixed_size;
487         BufferBuilder bb(n * fixed_size);
488         for (size_t i = 0; i < n; ++i) {
489             data::Serialization<BufferBuilder, T>::Serialize(value[i], bb);
490         }
491         SyncSend(bb.data(), bb.size());
492     }
493 
494     //! Send an array of serializable non-POD variable-length items T. if sending fails,
495     //! a net::Exception is thrown.
496     template <typename T>
497     typename std::enable_if<
498         !std::is_pod<T>::value &&
499         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
SendN(const T * value,size_t n)500     SendN(const T* value, size_t n) {
501         if (self_verify_ && is_loopback_) {
502             // for communication verification, send hash_code.
503             size_t hash_code = typeid(T).hash_code();
504             SyncSend(&hash_code, sizeof(hash_code));
505         }
506         // variable length items must be prefixed with size header
507         BufferBuilder bb;
508         for (size_t i = 0; i < n; ++i) {
509             data::Serialization<BufferBuilder, T>::Serialize(value[i], bb);
510         }
511         size_t size = bb.size();
512         SyncSend(&size, sizeof(size), MsgMore);
513         SyncSend(bb.data(), bb.size());
514     }
515 
516     //! \}
517 
518     //! \name ReceiveN Functions
519     //! \{
520 
521     //! Receive an array of serializable POD items T.
522     template <typename T>
523     typename std::enable_if<std::is_pod<T>::value, void>::type
ReceiveN(T * out_value,size_t n)524     ReceiveN(T* out_value, size_t n) {
525         if (self_verify_ && is_loopback_) {
526             // for communication verification, receive hash_code.
527             size_t hash_code;
528             SyncRecv(&hash_code, sizeof(hash_code));
529             if (hash_code != typeid(T).hash_code()) {
530                 throw std::runtime_error(
531                           "Connection::ReceiveN() attempted to receive item "
532                           "with different typeid!");
533             }
534         }
535         // receive PODs directly into memory.
536         SyncRecv(out_value, n * sizeof(T));
537     }
538 
539     //! Receive an array of serializable non-POD fixed-length items T.
540     template <typename T>
541     typename std::enable_if<
542         !std::is_pod<T>::value &&
543         data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
ReceiveN(T * out_value,size_t n)544     ReceiveN(T* out_value, size_t n) {
545         if (self_verify_ && is_loopback_) {
546             // for communication verification, receive hash_code.
547             size_t hash_code;
548             SyncRecv(&hash_code, sizeof(hash_code));
549             if (hash_code != typeid(T).hash_code()) {
550                 throw std::runtime_error(
551                           "Connection::ReceiveN() attempted to receive item "
552                           "with different typeid!");
553             }
554         }
555         // fixed_size items can be received without size header
556         Buffer b(n * data::Serialization<BufferBuilder, T>::fixed_size);
557         SyncRecv(b.data(), b.size());
558         BufferReader br(b);
559         for (size_t i = 0; i < n; ++i) {
560             out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br);
561         }
562     }
563 
564     //! Receive an array of serializable non-POD fixed-length items T.
565     template <typename T>
566     typename std::enable_if<
567         !std::is_pod<T>::value &&
568         !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
ReceiveN(T * out_value,size_t n)569     ReceiveN(T* out_value, size_t n) {
570         if (self_verify_ && is_loopback_) {
571             // for communication verification, receive hash_code.
572             size_t hash_code;
573             SyncRecv(&hash_code, sizeof(hash_code));
574             if (hash_code != typeid(T).hash_code()) {
575                 throw std::runtime_error(
576                           "Connection::ReceiveN() attempted to receive item "
577                           "with different typeid!");
578             }
579         }
580         // variable length items are prefixed with size header
581         size_t size;
582         SyncRecv(&size, sizeof(size));
583         // receives message
584         Buffer b(size);
585         SyncRecv(b.data(), size);
586         BufferReader br(b);
587         for (size_t i = 0; i < n; ++i) {
588             out_value[i] = data::Serialization<BufferReader, T>::Deserialize(br);
589         }
590     }
591 
592     //! \}
593 
594     //! \name Sequence Numbers
595     //! \{
596 
597     //! send sequence
598     std::atomic<uint32_t> tx_seq_ { 0 };
599 
600     //! receive sequence
601     std::atomic<uint32_t> rx_seq_ { 0 };
602 
603     //! \}
604 
605     //! \name Statistics
606     //! {
607 
608     //! sent bytes
609     std::atomic<size_t> tx_bytes_ { 0 };
610 
611     //! received bytes
612     std::atomic<size_t> rx_bytes_ = { 0 };
613 
614     //! previous read of sent bytes
615     size_t prev_tx_bytes_ = 0;
616 
617     //! previous read of received bytes
618     size_t prev_rx_bytes_ = 0;
619 
620     //! active send requests
621     std::atomic<size_t> tx_active_ { 0 };
622 
623     //! active recv requests
624     std::atomic<size_t> rx_active_ = { 0 };
625 
626     //! }
627 
628     //! make ostreamable
operator <<(std::ostream & os,const Connection & c)629     friend std::ostream& operator << (std::ostream& os, const Connection& c) {
630         return c.OutputOstream(os);
631     }
632 };
633 
634 // \}
635 
636 } // namespace net
637 } // namespace thrill
638 
639 #endif // !THRILL_NET_CONNECTION_HEADER
640 
641 /******************************************************************************/
642