1 /*
2  * Net2.actor.cpp
3  *
4  * This source file is part of the FoundationDB open source project
5  *
6  * Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 
21 #include "flow/Platform.h"
22 #include <algorithm>
23 #define BOOST_SYSTEM_NO_LIB
24 #define BOOST_DATE_TIME_NO_LIB
25 #define BOOST_REGEX_NO_LIB
26 #include "boost/asio.hpp"
27 #include "boost/bind.hpp"
28 #include "boost/date_time/posix_time/posix_time_types.hpp"
29 #include "flow/network.h"
30 #include "flow/IThreadPool.h"
31 #include "boost/range.hpp"
32 
33 #include "flow/ActorCollection.h"
34 #include "flow/ThreadSafeQueue.h"
35 #include "flow/ThreadHelper.actor.h"
36 #include "flow/TDMetric.actor.h"
37 #include "flow/AsioReactor.h"
38 #include "flow/Profiler.h"
39 
40 #ifdef WIN32
41 #include <mmsystem.h>
42 #endif
43 #include "flow/actorcompiler.h"  // This must be the last #include.
44 
45 // Defined to track the stack limit
46 extern "C" intptr_t g_stackYieldLimit;
47 intptr_t g_stackYieldLimit = 0;
48 
49 using namespace boost::asio::ip;
50 
51 // These impact both communications and the deserialization of certain database and IKeyValueStore keys.
52 //
53 // The convention is that 'x' and 'y' should match the major and minor version of the software, and 'z' should be 0.
54 // To make a change without a corresponding increase to the x.y version, increment the 'dev' digit.
55 //
56 //                                                       xyzdev
57 //                                                       vvvv
58 const uint64_t currentProtocolVersion        = 0x0FDB00B061060001LL;
59 const uint64_t compatibleProtocolVersionMask = 0xffffffffffff0000LL;
60 const uint64_t minValidProtocolVersion       = 0x0FDB00A200060001LL;
61 
62 // This assert is intended to help prevent incrementing the leftmost digits accidentally. It will probably need to change when we reach version 10.
63 static_assert(currentProtocolVersion < 0x0FDB00B100000000LL, "Unexpected protocol version");
64 
65 #if defined(__linux__)
66 #include <execinfo.h>
67 
68 volatile double net2liveness = 0;
69 
70 volatile size_t net2backtraces_max = 10000;
71 volatile void** volatile net2backtraces = NULL;
72 volatile size_t net2backtraces_offset = 0;
73 volatile bool net2backtraces_overflow = false;
74 volatile int net2backtraces_count = 0;
75 
76 volatile void **other_backtraces = NULL;
77 sigset_t sigprof_set;
78 
79 
initProfiling()80 void initProfiling() {
81 	net2backtraces = new volatile void*[net2backtraces_max];
82 	other_backtraces = new volatile void*[net2backtraces_max];
83 
84 	// According to folk wisdom, calling this once before setting up the signal handler makes
85 	// it async signal safe in practice :-/
86 	backtrace(const_cast<void**>(other_backtraces), net2backtraces_max);
87 
88 	sigemptyset(&sigprof_set);
89 	sigaddset(&sigprof_set, SIGPROF);
90 }
91 #endif
92 
93 DESCR struct SlowTask {
94 	int64_t clocks; //clocks
95 	int64_t duration; // ns
96 	int64_t priority; // priority level
97 	int64_t numYields; // count
98 };
99 
100 namespace N2 {  // No indent, it's the whole file
101 
102 class Net2;
103 class Peer;
104 class Connection;
105 
106 Net2 *g_net2 = 0;
107 
108 class Task {
109 public:
110 	virtual void operator()() = 0;
111 };
112 
113 struct OrderedTask {
114 	int64_t priority;
115 	int taskID;
116 	Task *task;
OrderedTaskN2::OrderedTask117 	OrderedTask(int64_t priority, int taskID, Task* task) : priority(priority), taskID(taskID), task(task) {}
operator <N2::OrderedTask118 	bool operator < (OrderedTask const& rhs) const { return priority < rhs.priority; }
119 };
120 
121 thread_local INetwork* thread_network = 0;
122 
123 class Net2 sealed : public INetwork, public INetworkConnections {
124 
125 public:
126 	Net2(bool useThreadPool, bool useMetrics);
127 	void run();
128 	void initMetrics();
129 
130 	// INetworkConnections interface
131 	virtual Future<Reference<IConnection>> connect( NetworkAddress toAddr, std::string host );
132 	virtual Future<std::vector<NetworkAddress>> resolveTCPEndpoint( std::string host, std::string service);
133 	virtual Reference<IListener> listen( NetworkAddress localAddr );
134 
135 	// INetwork interface
now()136 	virtual double now() { return currentTime; };
137 	virtual Future<Void> delay( double seconds, int taskId );
138 	virtual Future<class Void> yield( int taskID );
139 	virtual bool check_yield(int taskId);
getCurrentTask()140 	virtual int getCurrentTask() { return currentTaskID; }
setCurrentTask(int taskID)141 	virtual void setCurrentTask(int taskID ) { priorityMetric = currentTaskID = taskID; }
142 	virtual void onMainThread( Promise<Void>&& signal, int taskID );
stop()143 	virtual void stop() {
144 		if ( thread_network == this )
145 			stopImmediately();
146 		else
147 			// SOMEDAY: NULL for deferred error, no analysis of correctness (itp)
148 			onMainThreadVoid( [this] { this->stopImmediately(); }, NULL );
149 	}
150 
isSimulated() const151 	virtual bool isSimulated() const { return false; }
152 	virtual THREAD_HANDLE startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg);
153 
154 	virtual void getDiskBytes( std::string const& directory, int64_t& free, int64_t& total );
155 	virtual bool isAddressOnThisHost( NetworkAddress const& addr );
updateNow()156 	void updateNow(){ currentTime = timer_monotonic(); }
157 
global(int id)158 	virtual flowGlobalType global(int id) { return (globals.size() > id) ? globals[id] : NULL; }
setGlobal(size_t id,flowGlobalType v)159 	virtual void setGlobal(size_t id, flowGlobalType v) { globals.resize(std::max(globals.size(),id+1)); globals[id] = v; }
160 	std::vector<flowGlobalType>		globals;
161 
162 	bool useThreadPool;
163 //private:
164 
165 	ASIOReactor reactor;
166 	INetworkConnections *network;  // initially this, but can be changed
167 
168 	int64_t tsc_begin, tsc_end;
169 	double taskBegin;
170 	int currentTaskID;
171 	uint64_t tasksIssued;
172 	TDMetricCollection tdmetrics;
173 	double currentTime;
174 	bool stopped;
175 	std::map<IPAddress, bool> addressOnHostCache;
176 
177 	uint64_t numYields;
178 
179 	double lastPriorityTrackTime;
180 	int lastMinTaskID;
181 	double priorityTimer[NetworkMetrics::PRIORITY_BINS];
182 
183 	std::priority_queue<OrderedTask, std::vector<OrderedTask>> ready;
184 	ThreadSafeQueue<OrderedTask> threadReady;
185 
186 	struct DelayedTask : OrderedTask {
187 		double at;
DelayedTaskN2::sealed::DelayedTask188 		DelayedTask(double at, int64_t priority, int taskID, Task* task) : at(at), OrderedTask(priority, taskID, task) {}
operator <N2::sealed::DelayedTask189 		bool operator < (DelayedTask const& rhs) const { return at > rhs.at; } // Ordering is reversed for priority_queue
190 	};
191 	std::priority_queue<DelayedTask, std::vector<DelayedTask>> timers;
192 
193 	void checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, int64_t priority);
194 	bool check_yield(int taskId, bool isRunLoop);
195 	void processThreadReady();
196 	void trackMinPriority( int minTaskID, double now );
stopImmediately()197 	void stopImmediately() {
198 		stopped=true; decltype(ready) _1; ready.swap(_1); decltype(timers) _2; timers.swap(_2);
199 	}
200 
201 	Future<Void> timeOffsetLogger;
202 	Future<Void> logTimeOffset();
203 
204 	Int64MetricHandle bytesReceived;
205 	Int64MetricHandle countWriteProbes;
206 	Int64MetricHandle countReadProbes;
207 	Int64MetricHandle countReads;
208 	Int64MetricHandle countWouldBlock;
209 	Int64MetricHandle countWrites;
210 	Int64MetricHandle countRunLoop;
211 	Int64MetricHandle countCantSleep;
212 	Int64MetricHandle countWontSleep;
213 	Int64MetricHandle countTimers;
214 	Int64MetricHandle countTasks;
215 	Int64MetricHandle countYields;
216 	Int64MetricHandle countYieldBigStack;
217 	Int64MetricHandle countYieldCalls;
218 	Int64MetricHandle countYieldCallsTrue;
219 	Int64MetricHandle countASIOEvents;
220 	Int64MetricHandle countSlowTaskSignals;
221 	Int64MetricHandle priorityMetric;
222 	BoolMetricHandle awakeMetric;
223 
224 	EventMetricHandle<SlowTask> slowTaskMetric;
225 
226 	std::vector<std::string> blobCredentialFiles;
227 };
228 
tcpAddress(IPAddress const & n)229 static boost::asio::ip::address tcpAddress(IPAddress const& n) {
230 	if (n.isV6()) {
231 		return boost::asio::ip::address_v6(n.toV6());
232 	} else {
233 		return boost::asio::ip::address_v4(n.toV4());
234 	}
235 }
236 
tcpEndpoint(NetworkAddress const & n)237 static tcp::endpoint tcpEndpoint( NetworkAddress const& n ) {
238 	return tcp::endpoint(tcpAddress(n.ip), n.port);
239 }
240 
241 class BindPromise {
242 	Promise<Void> p;
243 	const char* errContext;
244 	UID errID;
245 public:
BindPromise(const char * errContext,UID errID)246 	BindPromise( const char* errContext, UID errID ) : errContext(errContext), errID(errID) {}
BindPromise(BindPromise const & r)247 	BindPromise( BindPromise const& r ) : p(r.p), errContext(r.errContext), errID(r.errID) {}
BindPromise(BindPromise && r)248 	BindPromise(BindPromise&& r) BOOST_NOEXCEPT : p(std::move(r.p)), errContext(r.errContext), errID(r.errID) {}
249 
getFuture()250 	Future<Void> getFuture() { return p.getFuture(); }
251 
operator ()(const boost::system::error_code & error,size_t bytesWritten=0)252 	void operator()( const boost::system::error_code& error, size_t bytesWritten=0 ) {
253 		try {
254 			if (error) {
255 				// Log the error...
256 				TraceEvent(SevWarn, errContext, errID).suppressFor(1.0).detail("Message", error.value());
257 				p.sendError( connection_failed() );
258 			} else
259 				p.send( Void() );
260 		} catch (Error& e) {
261 			p.sendError(e);
262 		} catch (...) {
263 			p.sendError(unknown_error());
264 		}
265 	}
266 };
267 
268 class Connection : public IConnection, ReferenceCounted<Connection> {
269 public:
addref()270 	virtual void addref() { ReferenceCounted<Connection>::addref(); }
delref()271 	virtual void delref() { ReferenceCounted<Connection>::delref(); }
272 
close()273 	virtual void close() {
274 		closeSocket();
275 	}
276 
Connection(boost::asio::io_service & io_service)277 	explicit Connection( boost::asio::io_service& io_service )
278 		: id(g_nondeterministic_random->randomUniqueID()), socket(io_service)
279 	{
280 	}
281 
282 	// This is not part of the IConnection interface, because it is wrapped by INetwork::connect()
connect(boost::asio::io_service * ios,NetworkAddress addr)283 	ACTOR static Future<Reference<IConnection>> connect( boost::asio::io_service* ios, NetworkAddress addr ) {
284 		state Reference<Connection> self( new Connection(*ios) );
285 
286 		self->peer_address = addr;
287 		try {
288 			auto to = tcpEndpoint(addr);
289 			BindPromise p("N2_ConnectError", self->id);
290 			Future<Void> onConnected = p.getFuture();
291 			self->socket.async_connect( to, std::move(p) );
292 
293 			wait( onConnected );
294 			self->init();
295 			return self;
296 		} catch (Error&) {
297 			// Either the connection failed, or was cancelled by the caller
298 			self->closeSocket();
299 			throw;
300 		}
301 	}
302 
303 	// This is not part of the IConnection interface, because it is wrapped by IListener::accept()
accept(NetworkAddress peerAddr)304 	void accept(NetworkAddress peerAddr) {
305 		this->peer_address = peerAddr;
306 		init();
307 	}
308 
309 	// returns when write() can write at least one byte
onWritable()310 	virtual Future<Void> onWritable() {
311 		++g_net2->countWriteProbes;
312 		BindPromise p("N2_WriteProbeError", id);
313 		auto f = p.getFuture();
314 		socket.async_write_some( boost::asio::null_buffers(), std::move(p) );
315 		return f;
316 	}
317 
318 	// returns when read() can read at least one byte
onReadable()319 	virtual Future<Void> onReadable() {
320 		++g_net2->countReadProbes;
321 		BindPromise p("N2_ReadProbeError", id);
322 		auto f = p.getFuture();
323 		socket.async_read_some( boost::asio::null_buffers(), std::move(p) );
324 		return f;
325 	}
326 
327 	// Reads as many bytes as possible from the read buffer into [begin,end) and returns the number of bytes read (might be 0)
read(uint8_t * begin,uint8_t * end)328 	virtual int read( uint8_t* begin, uint8_t* end ) {
329 		boost::system::error_code err;
330 		++g_net2->countReads;
331 		size_t toRead = end-begin;
332 		size_t size = socket.read_some( boost::asio::mutable_buffers_1(begin, toRead), err );
333 		g_net2->bytesReceived += size;
334 		//TraceEvent("ConnRead", this->id).detail("Bytes", size);
335 		if (err) {
336 			if (err == boost::asio::error::would_block) {
337 				++g_net2->countWouldBlock;
338 				return 0;
339 			}
340 			onReadError(err);
341 			throw connection_failed();
342 		}
343 		ASSERT( size );  // If the socket is closed, we expect an 'eof' error, not a zero return value
344 
345 		return size;
346 	}
347 
348 	// Writes as many bytes as possible from the given SendBuffer chain into the write buffer and returns the number of bytes written (might be 0)
write(SendBuffer const * data,int limit)349 	virtual int write( SendBuffer const* data, int limit ) {
350 		boost::system::error_code err;
351 		++g_net2->countWrites;
352 
353 		size_t sent = socket.write_some( boost::iterator_range<SendBufferIterator>(SendBufferIterator(data, limit), SendBufferIterator()), err );
354 
355 		if (err) {
356 			// Since there was an error, sent's value can't be used to infer that the buffer has data and the limit is positive so check explicitly.
357 			ASSERT(limit > 0);
358 			bool notEmpty = false;
359 			for(auto p = data; p; p = p->next)
360 				if(p->bytes_written - p->bytes_sent > 0) {
361 					notEmpty = true;
362 					break;
363 				}
364 			ASSERT(notEmpty);
365 
366 			if (err == boost::asio::error::would_block) {
367 				++g_net2->countWouldBlock;
368 				return 0;
369 			}
370 			onWriteError(err);
371 			throw connection_failed();
372 		}
373 
374 		ASSERT( sent );  // Make sure data was sent, and also this check will fail if the buffer chain was empty or the limit was not > 0.
375 		return sent;
376 	}
377 
getPeerAddress()378 	virtual NetworkAddress getPeerAddress() { return peer_address; }
379 
getDebugID()380 	virtual UID getDebugID() { return id; }
381 
getSocket()382 	tcp::socket& getSocket() { return socket; }
383 private:
384 	UID id;
385 	tcp::socket socket;
386 	NetworkAddress peer_address;
387 
388 	struct SendBufferIterator {
389 		typedef boost::asio::const_buffer value_type;
390 		typedef std::forward_iterator_tag iterator_category;
391 		typedef size_t difference_type;
392 		typedef boost::asio::const_buffer* pointer;
393 		typedef boost::asio::const_buffer& reference;
394 
395 		SendBuffer const* p;
396 		int limit;
397 
SendBufferIteratorN2::Connection::SendBufferIterator398 		SendBufferIterator(SendBuffer const* p=0, int limit = std::numeric_limits<int>::max()) : p(p), limit(limit) {
399 			ASSERT(limit > 0);
400 		}
401 
operator ==N2::Connection::SendBufferIterator402 		bool operator == (SendBufferIterator const& r) const { return p == r.p; }
operator !=N2::Connection::SendBufferIterator403 		bool operator != (SendBufferIterator const& r) const { return p != r.p; }
operator ++N2::Connection::SendBufferIterator404 		void operator++() {
405 			limit -= p->bytes_written - p->bytes_sent;
406 			if(limit > 0)
407 				p = p->next;
408 			else
409 				p = NULL;
410 		}
411 
operator *N2::Connection::SendBufferIterator412 		boost::asio::const_buffer operator*() const {
413 			return boost::asio::const_buffer( p->data + p->bytes_sent, std::min(limit, p->bytes_written - p->bytes_sent) );
414 		}
415 	};
416 
init()417 	void init() {
418 		// Socket settings that have to be set after connect or accept succeeds
419 		socket.non_blocking(true);
420 		socket.set_option(boost::asio::ip::tcp::no_delay(true));
421 	}
422 
closeSocket()423 	void closeSocket() {
424 		boost::system::error_code error;
425 		socket.close(error);
426 		if (error)
427 			TraceEvent(SevWarn, "N2_CloseError", id).suppressFor(1.0).detail("Message", error.value());
428 	}
429 
onReadError(const boost::system::error_code & error)430 	void onReadError( const boost::system::error_code& error ) {
431 		TraceEvent(SevWarn, "N2_ReadError", id).suppressFor(1.0).detail("Message", error.value());
432 		closeSocket();
433 	}
onWriteError(const boost::system::error_code & error)434 	void onWriteError( const boost::system::error_code& error ) {
435 		TraceEvent(SevWarn, "N2_WriteError", id).suppressFor(1.0).detail("Message", error.value());
436 		closeSocket();
437 	}
438 };
439 
440 class Listener : public IListener, ReferenceCounted<Listener> {
441 	NetworkAddress listenAddress;
442 	tcp::acceptor acceptor;
443 
444 public:
Listener(boost::asio::io_service & io_service,NetworkAddress listenAddress)445 	Listener( boost::asio::io_service& io_service, NetworkAddress listenAddress )
446 		: listenAddress(listenAddress), acceptor( io_service, tcpEndpoint( listenAddress ) )
447 	{
448 	}
449 
addref()450 	virtual void addref() { ReferenceCounted<Listener>::addref(); }
delref()451 	virtual void delref() { ReferenceCounted<Listener>::delref(); }
452 
453 	// Returns one incoming connection when it is available
accept()454 	virtual Future<Reference<IConnection>> accept() {
455 		return doAccept( this );
456 	}
457 
getListenAddress()458 	virtual NetworkAddress getListenAddress() { return listenAddress; }
459 
460 private:
doAccept(Listener * self)461 	ACTOR static Future<Reference<IConnection>> doAccept( Listener* self ) {
462 		state Reference<Connection> conn( new Connection( self->acceptor.get_io_service() ) );
463 		state tcp::acceptor::endpoint_type peer_endpoint;
464 		try {
465 			BindPromise p("N2_AcceptError", UID());
466 			auto f = p.getFuture();
467 			self->acceptor.async_accept( conn->getSocket(), peer_endpoint, std::move(p) );
468 			wait( f );
469 			auto peer_address = peer_endpoint.address().is_v6() ? IPAddress(peer_endpoint.address().to_v6().to_bytes())
470 			                                                    : IPAddress(peer_endpoint.address().to_v4().to_ulong());
471 			conn->accept(NetworkAddress(peer_address, peer_endpoint.port()));
472 
473 			return conn;
474 		} catch (...) {
475 			conn->close();
476 			throw;
477 		}
478 	}
479 };
480 
481 struct PromiseTask : public Task, public FastAllocated<PromiseTask> {
482 	Promise<Void> promise;
PromiseTaskN2::PromiseTask483 	PromiseTask() {}
PromiseTaskN2::PromiseTask484 	explicit PromiseTask( Promise<Void>&& promise ) BOOST_NOEXCEPT : promise(std::move(promise)) {}
485 
operator ()N2::PromiseTask486 	virtual void operator()() {
487 		promise.send(Void());
488 		delete this;
489 	}
490 };
491 
Net2(bool useThreadPool,bool useMetrics)492 Net2::Net2(bool useThreadPool, bool useMetrics)
493 	: useThreadPool(useThreadPool),
494 	  network(this),
495 	  reactor(this),
496 	  stopped(false),
497 	  tasksIssued(0),
498 	  // Until run() is called, yield() will always yield
499 	  tsc_begin(0), tsc_end(0), taskBegin(0), currentTaskID(TaskDefaultYield),
500 	  lastMinTaskID(0),
501 	  numYields(0)
502 {
503 	TraceEvent("Net2Starting");
504 
505 	// Set the global members
506 	if(useMetrics) {
507 		setGlobal(INetwork::enTDMetrics, (flowGlobalType) &tdmetrics);
508 	}
509 	setGlobal(INetwork::enNetworkConnections, (flowGlobalType) network);
510 	setGlobal(INetwork::enASIOService, (flowGlobalType) &reactor.ios);
511 	setGlobal(INetwork::enBlobCredentialFiles, &blobCredentialFiles);
512 
513 #ifdef __linux__
514 	setGlobal(INetwork::enEventFD, (flowGlobalType) N2::ASIOReactor::newEventFD(reactor));
515 #endif
516 
517 
518 	int priBins[] = { 1, 2050, 3050, 4050, 4950, 5050, 7050, 8050, 10050 };
519 	static_assert( sizeof(priBins) == sizeof(int)*NetworkMetrics::PRIORITY_BINS, "Fix priority bins");
520 	for(int i=0; i<NetworkMetrics::PRIORITY_BINS; i++)
521 		networkMetrics.priorityBins[i] = priBins[i];
522 	updateNow();
523 
524 }
525 
logTimeOffset()526 ACTOR Future<Void> Net2::logTimeOffset() {
527 	loop {
528 		double processTime = timer_monotonic();
529 		double systemTime = timer();
530 		TraceEvent("ProcessTimeOffset").detailf("ProcessTime", "%lf", processTime).detailf("SystemTime", "%lf", systemTime).detailf("OffsetFromSystemTime", "%lf", processTime - systemTime);
531 		wait(::delay(FLOW_KNOBS->TIME_OFFSET_LOGGING_INTERVAL));
532 	}
533 }
534 
initMetrics()535 void Net2::initMetrics() {
536 	bytesReceived.init(LiteralStringRef("Net2.BytesReceived"));
537 	countWriteProbes.init(LiteralStringRef("Net2.CountWriteProbes"));
538 	countReadProbes.init(LiteralStringRef("Net2.CountReadProbes"));
539 	countReads.init(LiteralStringRef("Net2.CountReads"));
540 	countWouldBlock.init(LiteralStringRef("Net2.CountWouldBlock"));
541 	countWrites.init(LiteralStringRef("Net2.CountWrites"));
542 	countRunLoop.init(LiteralStringRef("Net2.CountRunLoop"));
543 	countCantSleep.init(LiteralStringRef("Net2.CountCantSleep"));
544 	countWontSleep.init(LiteralStringRef("Net2.CountWontSleep"));
545 	countTimers.init(LiteralStringRef("Net2.CountTimers"));
546 	countTasks.init(LiteralStringRef("Net2.CountTasks"));
547 	countYields.init(LiteralStringRef("Net2.CountYields"));
548 	countYieldBigStack.init(LiteralStringRef("Net2.CountYieldBigStack"));
549 	countYieldCalls.init(LiteralStringRef("Net2.CountYieldCalls"));
550 	countASIOEvents.init(LiteralStringRef("Net2.CountASIOEvents"));
551 	countYieldCallsTrue.init(LiteralStringRef("Net2.CountYieldCallsTrue"));
552 	countSlowTaskSignals.init(LiteralStringRef("Net2.CountSlowTaskSignals"));
553 	priorityMetric.init(LiteralStringRef("Net2.Priority"));
554 	awakeMetric.init(LiteralStringRef("Net2.Awake"));
555 	slowTaskMetric.init(LiteralStringRef("Net2.SlowTask"));
556 }
557 
run()558 void Net2::run() {
559 	TraceEvent::setNetworkThread();
560 	TraceEvent("Net2Running");
561 
562 	thread_network = this;
563 
564 #ifdef WIN32
565 	if (timeBeginPeriod(1) != TIMERR_NOERROR)
566 		TraceEvent(SevError, "TimeBeginPeriodError");
567 #endif
568 
569 	timeOffsetLogger = logTimeOffset();
570 	const char *flow_profiler_enabled = getenv("FLOW_PROFILER_ENABLED");
571 	if (flow_profiler_enabled != nullptr && *flow_profiler_enabled != '\0') {
572 		// The empty string check is to allow running `FLOW_PROFILER_ENABLED= ./fdbserver` to force disabling flow profiling at startup.
573 		startProfiling(this);
574 	}
575 
576 	// Get the address to the launch function
577 	typedef void (*runCycleFuncPtr)();
578 	runCycleFuncPtr runFunc = reinterpret_cast<runCycleFuncPtr>(reinterpret_cast<flowGlobalType>(g_network->global(INetwork::enRunCycleFunc)));
579 
580 	double nnow = timer_monotonic();
581 
582 	while(!stopped) {
583 		++countRunLoop;
584 
585 		if (runFunc) {
586 			tsc_begin = __rdtsc();
587 			taskBegin = timer_monotonic();
588 			runFunc();
589 			checkForSlowTask(tsc_begin, __rdtsc(), timer_monotonic() - taskBegin, TaskRunCycleFunction);
590 		}
591 
592 		double sleepTime = 0;
593 		bool b = ready.empty();
594 		if (b) {
595 			b = threadReady.canSleep();
596 			if (!b) ++countCantSleep;
597 		} else
598 			++countWontSleep;
599 		if (b) {
600 			sleepTime = 1e99;
601 			if (!timers.empty())
602 				sleepTime = timers.top().at - timer_monotonic();  // + 500e-6?
603 		}
604 
605 		awakeMetric = false;
606 		if( sleepTime > 0 )
607 			priorityMetric = 0;
608 		reactor.sleepAndReact(sleepTime);
609 		awakeMetric = true;
610 
611 		updateNow();
612 		double now = this->currentTime;
613 
614 		if ((now-nnow) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && g_nondeterministic_random->random01() < (now-nnow)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE)
615 			TraceEvent("SomewhatSlowRunLoopTop").detail("Elapsed", now - nnow);
616 
617 		if (sleepTime) trackMinPriority( 0, now );
618 		while (!timers.empty() && timers.top().at < now) {
619 			++countTimers;
620 			ready.push( timers.top() );
621 			timers.pop();
622 		}
623 
624 		processThreadReady();
625 
626 		tsc_begin = __rdtsc();
627 		tsc_end = tsc_begin + FLOW_KNOBS->TSC_YIELD_TIME;
628 		taskBegin = timer_monotonic();
629 		numYields = 0;
630 		int minTaskID = TaskMaxPriority;
631 
632 		while (!ready.empty()) {
633 			++countTasks;
634 			currentTaskID = ready.top().taskID;
635 			priorityMetric = currentTaskID;
636 			minTaskID = std::min(minTaskID, currentTaskID);
637 			Task* task = ready.top().task;
638 			ready.pop();
639 
640 			try {
641 				(*task)();
642 			} catch (Error& e) {
643 				TraceEvent(SevError, "TaskError").error(e);
644 			} catch (...) {
645 				TraceEvent(SevError, "TaskError").error(unknown_error());
646 			}
647 
648 			if (check_yield(TaskMaxPriority, true)) { ++countYields; break; }
649 		}
650 
651 		nnow = timer_monotonic();
652 
653 #if defined(__linux__)
654 		if(FLOW_KNOBS->SLOWTASK_PROFILING_INTERVAL > 0) {
655 			sigset_t orig_set;
656 			pthread_sigmask(SIG_BLOCK, &sigprof_set, &orig_set);
657 
658 			size_t other_offset = net2backtraces_offset;
659 			bool was_overflow = net2backtraces_overflow;
660 			int signal_count = net2backtraces_count;
661 
662 			countSlowTaskSignals += signal_count;
663 
664 			if (other_offset) {
665 				volatile void** _traces = net2backtraces;
666 				net2backtraces = other_backtraces;
667 				other_backtraces = _traces;
668 
669 				net2backtraces_offset = 0;
670 			}
671 
672 			net2backtraces_overflow = false;
673 			net2backtraces_count = 0;
674 
675 			pthread_sigmask(SIG_SETMASK, &orig_set, NULL);
676 
677 			if (was_overflow) {
678 				TraceEvent("Net2SlowTaskOverflow")
679 					.detail("SignalsReceived", signal_count)
680 					.detail("BackTraceHarvested", other_offset != 0);
681 			}
682 			if (other_offset) {
683 				size_t iter_offset = 0;
684 				while (iter_offset < other_offset) {
685 					ProfilingSample *ps = (ProfilingSample *)(other_backtraces + iter_offset);
686 					TraceEvent(SevWarn, "Net2SlowTaskTrace").detailf("TraceTime", "%.6f", ps->timestamp).detail("Trace", platform::format_backtrace(ps->frames, ps->length));
687 					iter_offset += ps->length + 2;
688 				}
689 			}
690 
691 			// to keep the thread liveness check happy
692 			net2liveness = g_nondeterministic_random->random01();
693 		}
694 #endif
695 
696 		if ((nnow-now) > FLOW_KNOBS->SLOW_LOOP_CUTOFF && g_nondeterministic_random->random01() < (nnow-now)*FLOW_KNOBS->SLOW_LOOP_SAMPLING_RATE)
697 			TraceEvent("SomewhatSlowRunLoopBottom").detail("Elapsed", nnow - now); // This includes the time spent running tasks
698 
699 		trackMinPriority( minTaskID, nnow );
700 	}
701 
702 	#ifdef WIN32
703 	timeEndPeriod(1);
704 	#endif
705 }
706 
trackMinPriority(int minTaskID,double now)707 void Net2::trackMinPriority( int minTaskID, double now ) {
708 	if (minTaskID != lastMinTaskID)
709 		for(int c=0; c<NetworkMetrics::PRIORITY_BINS; c++) {
710 			int64_t pri = networkMetrics.priorityBins[c];
711 			if (pri >= minTaskID && pri < lastMinTaskID) {  // busy -> idle
712 				double busyFor = lastPriorityTrackTime - priorityTimer[c];
713 				networkMetrics.secSquaredPriorityBlocked[c] += busyFor*busyFor;
714 			}
715 			if (pri < minTaskID && pri >= lastMinTaskID) {  // idle -> busy
716 				priorityTimer[c] = now;
717 			}
718 		}
719 	lastMinTaskID = minTaskID;
720 	lastPriorityTrackTime = now;
721 }
722 
processThreadReady()723 void Net2::processThreadReady() {
724 	while (true) {
725 		Optional<OrderedTask> t = threadReady.pop();
726 		if (!t.present()) break;
727 		t.get().priority -= ++tasksIssued;
728 		ASSERT( t.get().task != 0 );
729 		ready.push( t.get() );
730 	}
731 }
732 
checkForSlowTask(int64_t tscBegin,int64_t tscEnd,double duration,int64_t priority)733 void Net2::checkForSlowTask(int64_t tscBegin, int64_t tscEnd, double duration, int64_t priority) {
734 	int64_t elapsed = tscEnd-tscBegin;
735 	if (elapsed > FLOW_KNOBS->TSC_YIELD_TIME && tscBegin > 0) {
736 		int i = std::min<double>(NetworkMetrics::SLOW_EVENT_BINS-1, log( elapsed/1e6 ) / log(2.));
737 		int s = ++networkMetrics.countSlowEvents[i];
738 		int64_t warnThreshold = g_network->isSimulated() ? 10e9 : 500e6;
739 
740 		//printf("SlowTask: %d, %d yields\n", (int)(elapsed/1e6), numYields);
741 
742 		slowTaskMetric->clocks = elapsed;
743 		slowTaskMetric->duration = (int64_t)(duration*1e9);
744 		slowTaskMetric->priority = priority;
745 		slowTaskMetric->numYields = numYields;
746 		slowTaskMetric->log();
747 
748 		double sampleRate = std::min(1.0, (elapsed > warnThreshold) ? 1.0 : elapsed / 10e9);
749 		if(FLOW_KNOBS->SLOWTASK_PROFILING_INTERVAL > 0 && duration > FLOW_KNOBS->SLOWTASK_PROFILING_INTERVAL) {
750 			sampleRate = 1; // Always include slow task events that could show up in our slow task profiling.
751 		}
752 
753 		if ( !DEBUG_DETERMINISM && (g_nondeterministic_random->random01() < sampleRate ))
754 			TraceEvent(elapsed > warnThreshold ? SevWarnAlways : SevInfo, "SlowTask").detail("TaskID", priority).detail("MClocks", elapsed/1e6).detail("Duration", duration).detail("SampleRate", sampleRate).detail("NumYields", numYields);
755 	}
756 }
757 
check_yield(int taskID,bool isRunLoop)758 bool Net2::check_yield( int taskID, bool isRunLoop ) {
759 	if(!isRunLoop && numYields > 0) {
760 		++numYields;
761 		return true;
762 	}
763 
764 	if ((g_stackYieldLimit) && ( (intptr_t)&taskID < g_stackYieldLimit )) {
765 		++countYieldBigStack;
766 		return true;
767 	}
768 
769 	processThreadReady();
770 
771 	if (taskID == TaskDefaultYield) taskID = currentTaskID;
772 	if (!ready.empty() && ready.top().priority > (int64_t(taskID)<<32))  {
773 		return true;
774 	}
775 
776 	// SOMEDAY: Yield if there are lots of higher priority tasks queued?
777 	int64_t tsc_now = __rdtsc();
778 	double newTaskBegin = timer_monotonic();
779 	if (tsc_now < tsc_begin) {
780 		return true;
781 	}
782 
783 	if(isRunLoop) {
784 		checkForSlowTask(tsc_begin, tsc_now, newTaskBegin-taskBegin, currentTaskID);
785 	}
786 
787 	if (tsc_now > tsc_end) {
788 		++numYields;
789 		return true;
790 	}
791 
792 	taskBegin = newTaskBegin;
793 	tsc_begin = tsc_now;
794 	return false;
795 }
796 
check_yield(int taskID)797 bool Net2::check_yield( int taskID ) {
798 	return check_yield(taskID, false);
799 }
800 
yield(int taskID)801 Future<class Void> Net2::yield( int taskID ) {
802 	++countYieldCalls;
803 	if (taskID == TaskDefaultYield) taskID = currentTaskID;
804 	if (check_yield(taskID, false)) {
805 		++countYieldCallsTrue;
806 		return delay(0, taskID);
807 	}
808 	g_network->setCurrentTask(taskID);
809 	return Void();
810 }
811 
delay(double seconds,int taskId)812 Future<Void> Net2::delay( double seconds, int taskId ) {
813 	if (seconds <= 0.) {
814 		PromiseTask* t = new PromiseTask;
815 		this->ready.push( OrderedTask( (int64_t(taskId)<<32)-(++tasksIssued), taskId, t) );
816 		return t->promise.getFuture();
817 	}
818 	if (seconds >= 4e12)  // Intervals that overflow an int64_t in microseconds (more than 100,000 years) are treated as infinite
819 		return Never();
820 
821 	double at = now() + seconds;
822 	PromiseTask* t = new PromiseTask;
823 	this->timers.push( DelayedTask( at, (int64_t(taskId)<<32)-(++tasksIssued), taskId, t ) );
824 	return t->promise.getFuture();
825 }
826 
onMainThread(Promise<Void> && signal,int taskID)827 void Net2::onMainThread(Promise<Void>&& signal, int taskID) {
828 	if (stopped) return;
829 	PromiseTask* p = new PromiseTask( std::move(signal) );
830 	int64_t priority = int64_t(taskID)<<32;
831 
832 	if ( thread_network == this )
833 	{
834 		processThreadReady();
835 		this->ready.push( OrderedTask( priority-(++tasksIssued), taskID, p ) );
836 	} else {
837 		if (threadReady.push( OrderedTask( priority, taskID, p ) ))
838 			reactor.wake();
839 	}
840 }
841 
startThread(THREAD_FUNC_RETURN (* func)(void *),void * arg)842 THREAD_HANDLE Net2::startThread( THREAD_FUNC_RETURN (*func) (void*), void *arg ) {
843 	return ::startThread(func, arg);
844 }
845 
846 
connect(NetworkAddress toAddr,std::string host)847 Future< Reference<IConnection> > Net2::connect( NetworkAddress toAddr, std::string host ) {
848 	return Connection::connect(&this->reactor.ios, toAddr);
849 }
850 
resolveTCPEndpoint_impl(Net2 * self,std::string host,std::string service)851 ACTOR static Future<std::vector<NetworkAddress>> resolveTCPEndpoint_impl( Net2 *self, std::string host, std::string service) {
852 	state tcp::resolver tcpResolver(self->reactor.ios);
853 	Promise<std::vector<NetworkAddress>> promise;
854 	state Future<std::vector<NetworkAddress>> result = promise.getFuture();
855 
856 	tcpResolver.async_resolve(tcp::resolver::query(host, service), [=](const boost::system::error_code &ec, tcp::resolver::iterator iter) {
857 		if(ec) {
858 			promise.sendError(lookup_failed());
859 			return;
860 		}
861 
862 		std::vector<NetworkAddress> addrs;
863 
864 		tcp::resolver::iterator end;
865 		while(iter != end) {
866 			auto endpoint = iter->endpoint();
867 			auto addr = endpoint.address();
868 			if (addr.is_v6()) {
869 				addrs.push_back(NetworkAddress(IPAddress(addr.to_v6().to_bytes()), endpoint.port()));
870 			} else {
871 				addrs.push_back(NetworkAddress(addr.to_v4().to_ulong(), endpoint.port()));
872 			}
873 			++iter;
874 		}
875 
876 		if(addrs.empty()) {
877 			promise.sendError(lookup_failed());
878 		}
879 		else {
880 			promise.send(addrs);
881 		}
882 	});
883 
884 	wait(ready(result));
885 	tcpResolver.cancel();
886 
887 	return result.get();
888 }
889 
resolveTCPEndpoint(std::string host,std::string service)890 Future<std::vector<NetworkAddress>> Net2::resolveTCPEndpoint( std::string host, std::string service) {
891 	return resolveTCPEndpoint_impl(this, host, service);
892 }
893 
isAddressOnThisHost(NetworkAddress const & addr)894 bool Net2::isAddressOnThisHost( NetworkAddress const& addr ) {
895 	auto it = addressOnHostCache.find( addr.ip );
896 	if (it != addressOnHostCache.end())
897 		return it->second;
898 
899 	if (addressOnHostCache.size() > 50000) addressOnHostCache.clear();  // Bound cache memory; should not really happen
900 
901 	try {
902 		boost::asio::io_service ioService;
903 		boost::asio::ip::udp::socket socket(ioService);
904 		boost::asio::ip::udp::endpoint endpoint(tcpAddress(addr.ip), 1);
905 		socket.connect(endpoint);
906 		bool local = addr.ip.isV6() ? socket.local_endpoint().address().to_v6().to_bytes() == addr.ip.toV6()
907 		                            : socket.local_endpoint().address().to_v4().to_ulong() == addr.ip.toV4();
908 		socket.close();
909 		if (local) TraceEvent(SevInfo, "AddressIsOnHost").detail("Address", addr);
910 		return addressOnHostCache[ addr.ip ] = local;
911 	}
912 	catch(boost::system::system_error e)
913 	{
914 		TraceEvent(SevWarnAlways, "IsAddressOnHostError").detail("Address", addr).detail("ErrDesc", e.what()).detail("ErrCode", e.code().value());
915 		return addressOnHostCache[ addr.ip ] = false;
916 	}
917 }
918 
listen(NetworkAddress localAddr)919 Reference<IListener> Net2::listen( NetworkAddress localAddr ) {
920 	try {
921 		return Reference<IListener>( new Listener( reactor.ios, localAddr ) );
922 	} catch (boost::system::system_error const& e) {
923 		Error x;
924 		if(e.code().value() == EADDRINUSE)
925 			x = address_in_use();
926 		else if(e.code().value() == EADDRNOTAVAIL)
927 			x = invalid_local_address();
928 		else
929 			x = bind_failed();
930 		TraceEvent("Net2ListenError").error(x).detail("Message", e.what());
931 		throw x;
932 	} catch (std::exception const& e) {
933 		Error x = unknown_error();
934 		TraceEvent("Net2ListenError").error(x).detail("Message", e.what());
935 		throw x;
936 	} catch (...) {
937 		Error x = unknown_error();
938 		TraceEvent("Net2ListenError").error(x);
939 		throw x;
940 	}
941 }
942 
getDiskBytes(std::string const & directory,int64_t & free,int64_t & total)943 void Net2::getDiskBytes( std::string const& directory, int64_t& free, int64_t& total ) {
944 	return ::getDiskBytes(directory, free, total);
945 }
946 
947 #ifdef __linux__
948 #include <sys/prctl.h>
949 #include <pthread.h>
950 #include <sched.h>
951 #endif
952 
ASIOReactor(Net2 * net)953 ASIOReactor::ASIOReactor(Net2* net)
954 	: network(net), firstTimer(ios), do_not_stop(ios)
955 {
956 #ifdef __linux__
957 	// Reactor flags are used only for experimentation, and are platform-specific
958 	if (FLOW_KNOBS->REACTOR_FLAGS & 1) {
959 		prctl(PR_SET_TIMERSLACK, 1, 0, 0, 0);
960 		printf("Set timerslack to 1ns\n");
961 	}
962 
963 	if (FLOW_KNOBS->REACTOR_FLAGS & 2) {
964 		int ret;
965 		pthread_t this_thread = pthread_self();
966 		struct sched_param params;
967 		params.sched_priority = sched_get_priority_max(SCHED_FIFO);
968 		ret = pthread_setschedparam(this_thread, SCHED_FIFO, &params);
969 		if (ret != 0) printf("Error setting priority (%d %d)\n", ret, errno);
970 		else
971 			printf("Set scheduler mode to SCHED_FIFO\n");
972 	}
973 #endif
974 }
975 
sleepAndReact(double sleepTime)976 void ASIOReactor::sleepAndReact(double sleepTime) {
977 	if (sleepTime > FLOW_KNOBS->BUSY_WAIT_THRESHOLD) {
978 		if (FLOW_KNOBS->REACTOR_FLAGS & 4) {
979 #ifdef __linux
980 			timespec tv;
981 			tv.tv_sec = 0;
982 			tv.tv_nsec = 20000;
983 			nanosleep(&tv, NULL);
984 #endif
985 		}
986 		else
987 		{
988 			sleepTime -= FLOW_KNOBS->BUSY_WAIT_THRESHOLD;
989 			if (sleepTime < 4e12) {
990 				this->firstTimer.expires_from_now(boost::posix_time::microseconds(int64_t(sleepTime*1e6)));
991 				this->firstTimer.async_wait(&nullWaitHandler);
992 			}
993 			setProfilingEnabled(0); // The following line generates false positives for slow task profiling
994 			ios.run_one();
995 			setProfilingEnabled(1);
996 			this->firstTimer.cancel();
997 		}
998 		++network->countASIOEvents;
999 	} else if (sleepTime > 0) {
1000 		if (!(FLOW_KNOBS->REACTOR_FLAGS & 8))
1001 			threadYield();
1002 	}
1003 	while (ios.poll_one()) ++network->countASIOEvents;  // Make this a task?
1004 }
1005 
wake()1006 void ASIOReactor::wake() {
1007 	ios.post( nullCompletionHandler );
1008 }
1009 
1010 } // namespace net2
1011 
newNet2(bool useThreadPool,bool useMetrics)1012 INetwork* newNet2(bool useThreadPool, bool useMetrics) {
1013 	try {
1014 		N2::g_net2 = new N2::Net2(useThreadPool, useMetrics);
1015 	}
1016 	catch(boost::system::system_error e) {
1017 		TraceEvent("Net2InitError").detail("Message", e.what());
1018 		throw unknown_error();
1019 	}
1020 	catch(std::exception const& e) {
1021 		TraceEvent("Net2InitError").detail("Message", e.what());
1022 		throw unknown_error();
1023 	}
1024 
1025 	return N2::g_net2;
1026 }
1027 
1028 struct TestGVR {
1029 	Standalone<StringRef> key;
1030 	int64_t version;
1031 	Optional<std::pair<UID,UID>> debugID;
1032 	Promise< Optional<Standalone<StringRef>> > reply;
1033 
TestGVRTestGVR1034 	TestGVR(){}
1035 
1036 	template <class Ar>
serializeTestGVR1037 	void serialize( Ar& ar ) {
1038 		serializer(ar, key, version, debugID, reply);
1039 	}
1040 };
1041 
1042 template <class F>
startThreadF(F && func)1043 void startThreadF( F && func ) {
1044 	struct Thing {
1045 		F f;
1046 		Thing( F && f ) : f(std::move(f)) {}
1047 		THREAD_FUNC start(void* p) { Thing* self = (Thing*)p; self->f(); delete self; THREAD_RETURN; }
1048 	};
1049 	Thing* t = new Thing(std::move(func));
1050 	startThread(Thing::start, t);
1051 }
1052 
net2_test()1053 void net2_test() {
1054 	/*printf("ThreadSafeQueue test\n");
1055 	printf("  Interface: ");
1056 	ThreadSafeQueue<int> tq;
1057 	ASSERT( tq.canSleep() == true );
1058 
1059 	ASSERT( tq.push( 1 ) == true ) ;
1060 	ASSERT( tq.push( 2 ) == false );
1061 	ASSERT( tq.push( 3 ) == false );
1062 
1063 	ASSERT( tq.pop().get() == 1 );
1064 	ASSERT( tq.pop().get() == 2 );
1065 	ASSERT( tq.push( 4 ) == false );
1066 	ASSERT( tq.pop().get() == 3 );
1067 	ASSERT( tq.pop().get() == 4 );
1068 	ASSERT( !tq.pop().present() );
1069 	printf("OK\n");
1070 
1071 	printf("Threaded: ");
1072 	Event finished, finished2;
1073 	int thread1Iterations = 1000000, thread2Iterations = 100000;
1074 
1075 	if (thread1Iterations)
1076 		startThreadF([&](){
1077 			printf("Thread1\n");
1078 			for(int i=0; i<thread1Iterations; i++)
1079 				tq.push(i);
1080 			printf("T1Done\n");
1081 			finished.set();
1082 		});
1083 	if (thread2Iterations)
1084 		startThreadF([&](){
1085 			printf("Thread2\n");
1086 			for(int i=0; i<thread2Iterations; i++)
1087 				tq.push(i + (1<<20));
1088 			printf("T2Done\n");
1089 			finished2.set();
1090 		});
1091 	int c = 0, mx[2]={0, 1<<20}, p = 0;
1092 	while (c < thread1Iterations + thread2Iterations)
1093 	{
1094 		Optional<int> i = tq.pop();
1095 		if (i.present()) {
1096 			int v = i.get();
1097 			++c;
1098 			if (mx[v>>20] != v)
1099 				printf("Wrong value dequeued!\n");
1100 			ASSERT( mx[v>>20] == v );
1101 			mx[v>>20] = v + 1;
1102 		} else {
1103 			++p;
1104 			_mm_pause();
1105 		}
1106 		if ((c&3)==0) tq.canSleep();
1107 	}
1108 	printf("%d %d %x %x %s\n", c, p, mx[0], mx[1], mx[0]==thread1Iterations && mx[1]==(1<<20)+thread2Iterations ? "OK" : "FAIL");
1109 
1110 	finished.block();
1111 	finished2.block();
1112 
1113 
1114 	g_network = newNet2();  // for promise serialization below
1115 
1116 	Endpoint destination;
1117 
1118 	printf("  Used: %lld\n", FastAllocator<4096>::getTotalMemory());
1119 
1120 	char junk[100];
1121 
1122 	double before = timer();
1123 
1124 	vector<TestGVR> reqs;
1125 	reqs.reserve( 10000 );
1126 
1127 	int totalBytes = 0;
1128 	for(int j=0; j<1000; j++) {
1129 		UnsentPacketQueue unsent;
1130 		ReliablePacketList reliable;
1131 
1132 		reqs.resize(10000);
1133 		for(int i=0; i<10000; i++) {
1134 			TestGVR &req = reqs[i];
1135 			req.key = LiteralStringRef("Foobar");
1136 
1137 			SerializeSource<TestGVR> what(req);
1138 
1139 			SendBuffer* pb = unsent.getWriteBuffer();
1140 			ReliablePacket* rp = new ReliablePacket;  // 0
1141 
1142 			PacketWriter wr(pb,rp,AssumeVersion(currentProtocolVersion));
1143 			//BinaryWriter wr;
1144 			SplitBuffer packetLen;
1145 			uint32_t len = 0;
1146 			wr.writeAhead(sizeof(len), &packetLen);
1147 			wr << destination.token;
1148 			//req.reply.getEndpoint();
1149 			what.serializePacketWriter(wr);
1150 			//wr.serializeBytes(junk, 43);
1151 
1152 			unsent.setWriteBuffer(wr.finish());
1153 			len = wr.size() - sizeof(len);
1154 			packetLen.write(&len, sizeof(len));
1155 
1156 			//totalBytes += wr.getLength();
1157 			totalBytes += wr.size();
1158 
1159 			if (rp) reliable.insert(rp);
1160 		}
1161 		reqs.clear();
1162 		unsent.discardAll();
1163 		reliable.discardAll();
1164 	}
1165 
1166 	printf("SimSend x 1Kx10K: %0.2f sec\n", timer()-before);
1167 	printf("  Bytes: %d\n", totalBytes);
1168 	printf("  Used: %lld\n", FastAllocator<4096>::getTotalMemory());
1169 	*/
1170 };
1171