1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2011-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2011-2011 Stu Redman ( admin@amule.org / http://www.amule.org )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
10 //
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
15 //
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 // GNU General Public License for more details.
20 //
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA
24 //
25 
26 #ifdef HAVE_CONFIG_H
27 #	include "config.h"		// Needed for HAVE_BOOST_SOURCES
28 #endif
29 
30 #ifdef _MSC_VER
31 #define _WIN32_WINNT 0x0501		// Boost complains otherwise
32 #endif
33 
34 // Windows requires that Boost headers are included before wx headers.
35 // This works if precompiled headers are disabled for this file.
36 
37 #define BOOST_ALL_NO_LIB
38 
39 // Suppress warning caused by faulty boost/preprocessor/config/config.hpp in Boost 1.49
40 #if defined __GNUC__ && ! defined __GXX_EXPERIMENTAL_CXX0X__ && __cplusplus < 201103L
41 	#define BOOST_PP_VARIADICS 0
42 #endif
43 
44 #include <algorithm>	// Needed for std::min - Boost up to 1.54 fails to compile with MSVC 2013 otherwise
45 
46 #include <boost/asio.hpp>
47 #include <boost/bind.hpp>
48 #include <boost/version.hpp>
49 
50 //
51 // Do away with building Boost.System, adding lib paths...
52 // Just include the single file and be done.
53 //
54 #ifdef HAVE_BOOST_SOURCES
55 #	include <boost/../libs/system/src/error_code.cpp>
56 #else
57 #	include <boost/system/error_code.hpp>
58 #endif
59 
60 #include "LibSocket.h"
61 #include <wx/thread.h>		// wxMutex
62 #include <wx/intl.h>		// _()
63 #include <common/Format.h>	// Needed for CFormat
64 #include "Logger.h"
65 #include "GuiEvents.h"
66 #include "amuleIPV4Address.h"
67 #include "MuleUDPSocket.h"
68 #include "OtherFunctions.h"	// DeleteContents
69 #include "ScopedPtr.h"
70 #include <common/Macros.h>
71 
72 using namespace boost::asio;
73 using namespace boost::system;	// for error_code
74 static io_service s_io_service;
75 
76 // Number of threads in the Asio thread pool
77 const int CAsioService::m_numberOfThreads = 4;
78 
79 /**
80  * ASIO Client TCP socket implementation
81  */
82 
83 class CamuleIPV4Endpoint : public ip::tcp::endpoint {
84 public:
CamuleIPV4Endpoint()85 	CamuleIPV4Endpoint() {}
86 
CamuleIPV4Endpoint(const CamuleIPV4Endpoint & impl)87 	CamuleIPV4Endpoint(const CamuleIPV4Endpoint & impl) : ip::tcp::endpoint(impl) {}
CamuleIPV4Endpoint(const ip::tcp::endpoint & ep)88 	CamuleIPV4Endpoint(const ip::tcp::endpoint & ep) { * this = ep; }
CamuleIPV4Endpoint(const ip::udp::endpoint & ep)89 	CamuleIPV4Endpoint(const ip::udp::endpoint & ep) { address(ep.address()); port(ep.port()); }
90 
operator =(const ip::tcp::endpoint & ep)91 	const CamuleIPV4Endpoint& operator = (const ip::tcp::endpoint & ep)
92 	{
93 		* (ip::tcp::endpoint *) this = ep;
94 		return *this;
95 	}
96 };
97 
98 class CAsioSocketImpl
99 {
100 public:
101 	// cppcheck-suppress uninitMemberVar m_readBufferPtr
CAsioSocketImpl(CLibSocket * libSocket)102 	CAsioSocketImpl(CLibSocket * libSocket) :
103 		m_libSocket(libSocket),
104 		m_strand(s_io_service),
105 		m_timer(s_io_service)
106 	{
107 		m_OK = false;
108 		m_blocksRead = false;
109 		m_blocksWrite = false;
110 		m_ErrorCode = 0;
111 		m_readBuffer = NULL;
112 		m_readBufferSize = 0;
113 		m_readPending = false;
114 		m_readBufferContent = 0;
115 		m_eventPending = false;
116 		m_port = 0;
117 		m_sendBuffer = NULL;
118 		m_connected = false;
119 		m_closed = false;
120 		m_isDestroying = false;
121 		m_proxyState = false;
122 		m_notify = true;
123 		m_sync = false;
124 		m_IP = wxT("?");
125 		m_IPint = 0;
126 		m_socket = new ip::tcp::socket(s_io_service);
127 
128 		// Set socket to non blocking
129 		m_socket->non_blocking();
130 	}
131 
~CAsioSocketImpl()132 	~CAsioSocketImpl()
133 	{
134 		delete[] m_readBuffer;
135 		delete[] m_sendBuffer;
136 	}
137 
Notify(bool notify)138 	void Notify(bool notify)
139 	{
140 		m_notify = notify;
141 	}
142 
Connect(const amuleIPV4Address & adr,bool wait)143 	bool Connect(const amuleIPV4Address& adr, bool wait)
144 	{
145 		if (!m_proxyState) {
146 			SetIp(adr);
147 		}
148 		m_port = adr.Service();
149 		m_closed = false;
150 		m_OK = false;
151 		m_sync = !m_notify;		// set this once for the whole lifetime of the socket
152 		AddDebugLogLineF(logAsio, CFormat(wxT("Connect %s %p")) % m_IP % this);
153 
154 		if (wait || m_sync) {
155 			error_code ec;
156 			m_socket->connect(adr.GetEndpoint(), ec);
157 			m_OK = !ec;
158 			m_connected = m_OK;
159 			return m_OK;
160 		} else {
161 			m_socket->async_connect(adr.GetEndpoint(),
162 				m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleConnect, this, placeholders::error)));
163 			// m_OK and return are false because we are not connected yet
164 			return false;
165 		}
166 	}
167 
IsConnected() const168 	bool IsConnected() const
169 	{
170 		return m_connected;
171 	}
172 
173 	// For wxSocketClient, Ok won't return true unless the client is connected to a server.
IsOk() const174 	bool IsOk() const
175 	{
176 		return m_OK;
177 	}
178 
IsDestroying() const179 	bool IsDestroying() const
180 	{
181 		return m_isDestroying;
182 	}
183 
184 	// Returns the actual error code
LastError() const185 	int LastError() const
186 	{
187 		return m_ErrorCode;
188 	}
189 
190 	// Is reading blocked?
BlocksRead() const191 	bool BlocksRead() const
192 	{
193 		return m_blocksRead;
194 	}
195 
196 	// Is writing blocked?
BlocksWrite() const197 	bool BlocksWrite() const
198 	{
199 		return m_blocksWrite;
200 	}
201 
202 	// Problem: wx sends an event when data gets available, so first there is an event, then Read() is called
203 	// Asio can read async with callback, so you first read, then you get an event.
204 	// Strategy:
205 	// - Read some data in background into a buffer
206 	// - Callback posts event when something is there
207 	// - Read data from buffer
208 	// - If data is exhausted, start reading more in background
209 	// - If not, post another event (making sure events don't pile up though)
Read(char * buf,uint32 bytesToRead)210 	uint32 Read(char * buf, uint32 bytesToRead)
211 	{
212 		if (bytesToRead == 0) {			// huh?
213 			return 0;
214 		}
215 
216 		if (m_sync) {
217 			return ReadSync(buf, bytesToRead);
218 		}
219 
220 		if (m_ErrorCode) {
221 			AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Error")) % m_IP % bytesToRead);
222 			return 0;
223 		}
224 
225 		if (m_readPending					// Background read hasn't completed.
226 			|| m_readBufferContent == 0) {	// shouldn't be if it's not pending
227 
228 			m_blocksRead = true;
229 			AddDebugLogLineF(logAsio, CFormat(wxT("Read1 %s %d - Block")) % m_IP % bytesToRead);
230 			return 0;
231 		}
232 
233 		m_blocksRead = false;	// shouldn't be needed
234 
235 		// Read from our buffer
236 		uint32 readCache = std::min(m_readBufferContent, bytesToRead);
237 		memcpy(buf, m_readBufferPtr, readCache);
238 		m_readBufferContent	-= readCache;
239 		m_readBufferPtr		+= readCache;
240 
241 		AddDebugLogLineF(logAsio, CFormat(wxT("Read2 %s %d - %d")) % m_IP % bytesToRead % readCache);
242 		if (m_readBufferContent) {
243 			// Data left, post another event
244 			PostReadEvent(1);
245 		} else {
246 			// Nothing left, read more
247 			StartBackgroundRead();
248 		}
249 		return readCache;
250 	}
251 
252 
253 	// Make a copy of the data and send it in background
254 	// - unless a background send is already going on
Write(const void * buf,uint32 nbytes)255 	uint32 Write(const void * buf, uint32 nbytes)
256 	{
257 		if (m_sync) {
258 			return WriteSync(buf, nbytes);
259 		}
260 
261 		if (m_sendBuffer) {
262 			m_blocksWrite = true;
263 			AddDebugLogLineF(logAsio, CFormat(wxT("Write blocks %d %p %s")) % nbytes % m_sendBuffer % m_IP);
264 			return 0;
265 		}
266 		AddDebugLogLineF(logAsio, CFormat(wxT("Write %d %s")) % nbytes % m_IP);
267 		m_sendBuffer = new char[nbytes];
268 		memcpy(m_sendBuffer, buf, nbytes);
269 		m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchWrite, this, nbytes));
270 		m_ErrorCode = 0;
271 		return nbytes;
272 	}
273 
274 
Close()275 	void Close()
276 	{
277 		if (!m_closed) {
278 			m_closed = true;
279 			m_connected = false;
280 			if (m_sync || s_io_service.stopped()) {
281 				DispatchClose();
282 			} else {
283 				m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchClose, this));
284 			}
285 		}
286 	}
287 
288 
Destroy()289 	void Destroy()
290 	{
291 		if (m_isDestroying) {
292 			AddDebugLogLineC(logAsio, CFormat(wxT("Destroy() already dying socket %p %p %s")) % m_libSocket % this % m_IP);
293 			return;
294 		}
295 		m_isDestroying = true;
296 		AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p %s")) % m_libSocket % this % m_IP);
297 		Close();
298 		if (m_sync || s_io_service.stopped()) {
299 			HandleDestroy();
300 		} else {
301 			// Close prevents creation of any more callbacks, but does not clear any callbacks already
302 			// sitting in Asio's event queue (I have seen such a crash).
303 			// So create a delay timer so they can be called until core is notified.
304 			m_timer.expires_from_now(boost::posix_time::seconds(1));
305 			m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleDestroy, this)));
306 		}
307 	}
308 
309 
GetPeer()310 	wxString GetPeer()
311 	{
312 		return m_IP;
313 	}
314 
GetPeerInt()315 	uint32 GetPeerInt()
316 	{
317 		return m_IPint;
318 	}
319 
320 	//
321 	// Bind socket to local endpoint if user wants to choose the local address
322 	//
SetLocal(const amuleIPV4Address & local)323 	void SetLocal(const amuleIPV4Address& local)
324 	{
325 		error_code ec;
326 		if (!m_socket->is_open()) {
327 			// Socket is usually still closed when this is called
328 			m_socket->open(boost::asio::ip::tcp::v4(), ec);
329 			if (ec) {
330 				AddDebugLogLineC(logAsio, CFormat(wxT("Can't open socket : %s")) % ec.message());
331 			}
332 		}
333 		//
334 		// We are using random (OS-defined) local ports.
335 		// To set a constant output port, first call
336 		// m_socket->set_option(socket_base::reuse_address(true));
337 		// and then set the endpoint's port to it.
338 		//
339 		CamuleIPV4Endpoint endpoint(local.GetEndpoint());
340 		endpoint.port(0);
341 		m_socket->bind(endpoint, ec);
342 		if (ec) {
343 			AddDebugLogLineC(logAsio, CFormat(wxT("Can't bind socket to local endpoint %s : %s"))
344 				% local.IPAddress() % ec.message());
345 		} else {
346 			AddDebugLogLineF(logAsio, CFormat(wxT("Bound socket to local endpoint %s")) % local.IPAddress());
347 		}
348 	}
349 
350 
EventProcessed()351 	void EventProcessed()
352 	{
353 		m_eventPending = false;
354 	}
355 
SetWrapSocket(CLibSocket * socket)356 	void SetWrapSocket(CLibSocket * socket)
357 	{
358 		m_libSocket = socket;
359 		// Also do some setting up
360 		m_OK = true;
361 		m_connected = true;
362 		// Start reading
363 		StartBackgroundRead();
364 	}
365 
UpdateIP()366 	bool UpdateIP()
367 	{
368 		error_code ec;
369 		amuleIPV4Address addr = CamuleIPV4Endpoint(m_socket->remote_endpoint(ec));
370 		if (SetError(ec)) {
371 			AddDebugLogLineN(logAsio, CFormat(wxT("UpdateIP failed %p %s")) % this % ec.message());
372 			return false;
373 		}
374 		SetIp(addr);
375 		m_port = addr.Service();
376 		AddDebugLogLineF(logAsio, CFormat(wxT("UpdateIP %s %d %p")) % m_IP % m_port % this);
377 		return true;
378 	}
379 
GetIP() const380 	const wxChar * GetIP() const { return m_IP; }
GetPort() const381 	uint16 GetPort() const { return m_port; }
382 
GetAsioSocket()383 	ip::tcp::socket & GetAsioSocket()
384 	{
385 		return * m_socket;
386 	}
387 
GetProxyState() const388 	bool GetProxyState() const { return m_proxyState; }
389 
SetProxyState(bool state,const amuleIPV4Address * adr)390 	void SetProxyState(bool state, const amuleIPV4Address * adr)
391 	{
392 		m_proxyState = state;
393 		if (state) {
394 			// Start. Get the true IP for logging.
395 			wxASSERT(adr);
396 			SetIp(*adr);
397 			AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to proxy %s")) % m_IP);
398 		} else {
399 			// Transition from proxy to normal mode
400 			AddDebugLogLineF(logAsio, CFormat(wxT("SetProxyState to normal %s")) % m_IP);
401 			m_ErrorCode = 0;
402 		}
403 	}
404 
405 private:
406 	//
407 	// Dispatch handlers
408 	// Access to m_socket is all bundled in the thread running s_io_service to avoid
409 	// concurrent access to the socket from several threads.
410 	// So once things are running (after connect), all access goes through one of these handlers.
411 	//
DispatchClose()412 	void DispatchClose()
413 	{
414 		error_code ec;
415 		m_socket->close(ec);
416 		if (ec) {
417 			AddDebugLogLineC(logAsio, CFormat(wxT("Close error %s %s")) % m_IP % ec.message());
418 		} else {
419 			AddDebugLogLineF(logAsio, CFormat(wxT("Closed %s")) % m_IP);
420 		}
421 	}
422 
DispatchBackgroundRead()423 	void DispatchBackgroundRead()
424 	{
425 		AddDebugLogLineF(logAsio, CFormat(wxT("DispatchBackgroundRead %s")) % m_IP);
426 		m_socket->async_read_some(null_buffers(),
427 			m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleRead, this, placeholders::error)));
428 	}
429 
DispatchWrite(uint32 nbytes)430 	void DispatchWrite(uint32 nbytes)
431 	{
432 		async_write(*m_socket, buffer(m_sendBuffer, nbytes),
433 			m_strand.wrap(boost::bind(& CAsioSocketImpl::HandleSend, this, placeholders::error, placeholders::bytes_transferred)));
434 	}
435 
436 	//
437 	// Completion handlers for async requests
438 	//
439 
HandleConnect(const error_code & err)440 	void HandleConnect(const error_code& err)
441 	{
442 		m_OK = !err;
443 		AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect %d %s")) % m_OK % m_IP);
444 		if (m_isDestroying) {
445 			AddDebugLogLineF(logAsio, CFormat(wxT("HandleConnect: socket pending for deletion %s")) % m_IP);
446 		} else {
447 			CoreNotify_LibSocketConnect(m_libSocket, err.value());
448 			if (m_OK) {
449 				// After connect also send a OUTPUT event to show data is available
450 				CoreNotify_LibSocketSend(m_libSocket, 0);
451 				// Start reading
452 				StartBackgroundRead();
453 				m_connected = true;
454 			}
455 		}
456 	}
457 
HandleSend(const error_code & err,size_t DEBUG_ONLY (bytes_transferred))458 	void HandleSend(const error_code& err, size_t DEBUG_ONLY(bytes_transferred) )
459 	{
460 		delete[] m_sendBuffer;
461 		m_sendBuffer = NULL;
462 
463 		if (m_isDestroying) {
464 			AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend: socket pending for deletion %s")) % m_IP);
465 		} else {
466 			if (SetError(err)) {
467 				AddDebugLogLineN(logAsio, CFormat(wxT("HandleSend Error %d %s")) % bytes_transferred % m_IP);
468 				PostLostEvent();
469 			} else {
470 				AddDebugLogLineF(logAsio, CFormat(wxT("HandleSend %d %s")) % bytes_transferred % m_IP);
471 				m_blocksWrite = false;
472 				CoreNotify_LibSocketSend(m_libSocket, m_ErrorCode);
473 			}
474 		}
475 	}
476 
HandleRead(const error_code & ec)477 	void HandleRead(const error_code & ec)
478 	{
479 		if (m_isDestroying) {
480 			AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead: socket pending for deletion %s")) % m_IP);
481 		}
482 
483 		if (SetError(ec)) {
484 			// This is what we get in Windows when a connection gets closed from remote.
485 			AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError %s %s")) % m_IP % ec.message());
486 			PostLostEvent();
487 			return;
488 		}
489 
490 		error_code ec2;
491 		uint32 avail = m_socket->available(ec2);
492 		if (SetError(ec2)) {
493 			AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError available %d %s %s")) % avail % m_IP % ec2.message());
494 			PostLostEvent();
495 			return;
496 		}
497 		if (avail == 0) {
498 			// This is what we get in Linux when a connection gets closed from remote.
499 			AddDebugLogLineF(logAsio, CFormat(wxT("HandleReadError nothing available %s")) % m_IP);
500 			SetError();
501 			PostLostEvent();
502 			return;
503 		}
504 		AddDebugLogLineF(logAsio, CFormat(wxT("HandleRead %d %s")) % avail % m_IP);
505 
506 		// adjust (or create) our read buffer
507 		if (m_readBufferSize < avail) {
508 			delete[] m_readBuffer;
509 			m_readBuffer = new char[avail];
510 			m_readBufferSize = avail;
511 		}
512 		m_readBufferPtr = m_readBuffer;
513 
514 		// read available data
515 		m_readBufferContent = m_socket->read_some(buffer(m_readBuffer, avail), ec2);
516 		if (SetError(ec2) || m_readBufferContent == 0) {
517 			AddDebugLogLineN(logAsio, CFormat(wxT("HandleReadError read %d %s %s")) % m_readBufferContent % m_IP % ec2.message());
518 			PostLostEvent();
519 			return;
520 		}
521 
522 		m_readPending = false;
523 		m_blocksRead = false;
524 		PostReadEvent(2);
525 	}
526 
HandleDestroy()527 	void HandleDestroy()
528 	{
529 		AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p %s")) % m_libSocket % this % m_IP);
530 		CoreNotify_LibSocketDestroy(m_libSocket);
531 	}
532 
533 
534 	//
535 	// Other functions
536 	//
537 
StartBackgroundRead()538 	void StartBackgroundRead()
539 	{
540 		m_readPending = true;
541 		m_readBufferContent = 0;
542 		m_strand.dispatch(boost::bind(& CAsioSocketImpl::DispatchBackgroundRead, this));
543 	}
544 
PostReadEvent(int DEBUG_ONLY (from))545 	void PostReadEvent(int DEBUG_ONLY(from) )
546 	{
547 		if (!m_eventPending) {
548 			AddDebugLogLineF(logAsio, CFormat(wxT("Post read event %d %s")) % from % m_IP);
549 			m_eventPending = true;
550 			CoreNotify_LibSocketReceive(m_libSocket, m_ErrorCode);
551 		}
552 	}
553 
PostLostEvent()554 	void PostLostEvent()
555 	{
556 		if (!m_isDestroying && !m_closed) {
557 			CoreNotify_LibSocketLost(m_libSocket);
558 		}
559 	}
560 
SetError()561 	void SetError()
562 	{
563 		m_ErrorCode = 2;
564 	}
565 
SetError(const error_code & err)566 	bool SetError(const error_code & err)
567 	{
568 		m_ErrorCode = err.value();
569 		return m_ErrorCode != errc::success;
570 	}
571 
572 	//
573 	// Synchronous sockets (amulecmd)
574 	//
ReadSync(char * buf,uint32 bytesToRead)575 	uint32 ReadSync(char * buf, uint32 bytesToRead)
576 	{
577 		error_code ec;
578 		uint32 received = read(*m_socket, buffer(buf, bytesToRead), ec);
579 		SetError(ec);
580 		return received;
581 	}
582 
WriteSync(const void * buf,uint32 nbytes)583 	uint32 WriteSync(const void * buf, uint32 nbytes)
584 	{
585 		error_code ec;
586 		uint32 sent = write(*m_socket, buffer(buf, nbytes), ec);
587 		SetError(ec);
588 		return sent;
589 	}
590 
591 	//
592 	// Access to even const & wxString is apparently not thread-safe.
593 	// Locks are set/removed in wx and reference counts can go astray.
594 	// So store our IP string in a wxString which is used nowhere.
595 	// Store a pointer to its string buffer as well and use THAT everywhere.
596 	//
SetIp(const amuleIPV4Address & adr)597 	void SetIp(const amuleIPV4Address& adr)
598 	{
599 		m_IPstring = adr.IPAddress();
600 		m_IP = m_IPstring.c_str();
601 		m_IPint = StringIPtoUint32(m_IPstring);
602 	}
603 
604 	CLibSocket	*	m_libSocket;
605 	ip::tcp::socket	*	m_socket;
606 										// remote IP
607 	wxString		m_IPstring;			// as String (use nowhere because of threading!)
608 	const wxChar *	m_IP;				// as char*  (use in debug logs)
609 	uint32			m_IPint;			// as int
610 	uint16			m_port;				// remote port
611 	bool			m_OK;
612 	int				m_ErrorCode;
613 	bool			m_blocksRead;
614 	bool			m_blocksWrite;
615 	char *			m_readBuffer;
616 	uint32			m_readBufferSize;
617 	char *			m_readBufferPtr;
618 	bool			m_readPending;
619 	uint32			m_readBufferContent;
620 	bool			m_eventPending;
621 	char *			m_sendBuffer;
622 	io_service::strand	m_strand;		// handle synchronisation in io_service thread pool
623 	deadline_timer	m_timer;
624 	bool			m_connected;
625 	bool			m_closed;
626 	bool			m_isDestroying;		// true if Destroy() was called
627 	bool			m_proxyState;
628 	bool			m_notify;			// set by Notify()
629 	bool			m_sync;				// copied from !m_notify on Connect()
630 };
631 
632 
633 /**
634  * Library socket wrapper
635  */
636 
CLibSocket(int)637 CLibSocket::CLibSocket(int /* flags */)
638 {
639 	m_aSocket = new CAsioSocketImpl(this);
640 }
641 
642 
~CLibSocket()643 CLibSocket::~CLibSocket()
644 {
645 	AddDebugLogLineF(logAsio, CFormat(wxT("~CLibSocket() %p %p %s")) % this % m_aSocket % m_aSocket->GetIP());
646 	delete m_aSocket;
647 }
648 
649 
Connect(const amuleIPV4Address & adr,bool wait)650 bool CLibSocket::Connect(const amuleIPV4Address& adr, bool wait)
651 {
652 	return m_aSocket->Connect(adr, wait);
653 }
654 
655 
IsConnected() const656 bool CLibSocket::IsConnected() const
657 {
658 	return m_aSocket->IsConnected();
659 }
660 
661 
IsOk() const662 bool CLibSocket::IsOk() const
663 {
664 	return m_aSocket->IsOk();
665 }
666 
667 
GetPeer()668 wxString CLibSocket::GetPeer()
669 {
670 	return m_aSocket->GetPeer();
671 }
672 
673 
GetPeerInt()674 uint32 CLibSocket::GetPeerInt()
675 {
676 	return m_aSocket->GetPeerInt();
677 }
678 
679 
Destroy()680 void CLibSocket::Destroy()
681 {
682 	m_aSocket->Destroy();
683 }
684 
685 
IsDestroying() const686 bool CLibSocket::IsDestroying() const
687 {
688 	return m_aSocket->IsDestroying();
689 }
690 
691 
Notify(bool notify)692 void CLibSocket::Notify(bool notify)
693 {
694 	m_aSocket->Notify(notify);
695 }
696 
697 
Read(void * buffer,uint32 nbytes)698 uint32 CLibSocket::Read(void * buffer, uint32 nbytes)
699 {
700 	return m_aSocket->Read((char *) buffer, nbytes);
701 }
702 
703 
Write(const void * buffer,uint32 nbytes)704 uint32 CLibSocket::Write(const void * buffer, uint32 nbytes)
705 {
706 	return m_aSocket->Write(buffer, nbytes);
707 }
708 
709 
Close()710 void CLibSocket::Close()
711 {
712 	m_aSocket->Close();
713 }
714 
715 
LastError() const716 int CLibSocket::LastError() const
717 {
718 	return m_aSocket->LastError();
719 }
720 
721 
SetLocal(const amuleIPV4Address & local)722 void CLibSocket::SetLocal(const amuleIPV4Address& local)
723 {
724 	m_aSocket->SetLocal(local);
725 }
726 
727 
728 
729 // new Stuff
730 
BlocksRead() const731 bool CLibSocket::BlocksRead() const
732 {
733 	return m_aSocket->BlocksRead();
734 }
735 
736 
BlocksWrite() const737 bool CLibSocket::BlocksWrite() const
738 {
739 	return m_aSocket->BlocksWrite();
740 }
741 
742 
EventProcessed()743 void CLibSocket::EventProcessed()
744 {
745 	m_aSocket->EventProcessed();
746 }
747 
748 
LinkSocketImpl(class CAsioSocketImpl * socket)749 void CLibSocket::LinkSocketImpl(class CAsioSocketImpl * socket)
750 {
751 	delete m_aSocket;
752 	m_aSocket = socket;
753 	m_aSocket->SetWrapSocket(this);
754 }
755 
756 
GetIP() const757 const wxChar * CLibSocket::GetIP() const
758 {
759 	return m_aSocket->GetIP();
760 }
761 
762 
GetProxyState() const763 bool CLibSocket::GetProxyState() const
764 {
765 	return m_aSocket->GetProxyState();
766 }
767 
768 
SetProxyState(bool state,const amuleIPV4Address * adr)769 void CLibSocket::SetProxyState(bool state, const amuleIPV4Address * adr)
770 {
771 	m_aSocket->SetProxyState(state, adr);
772 }
773 
774 
775 /**
776  * ASIO TCP socket server
777  */
778 
779 class CAsioSocketServerImpl : public ip::tcp::acceptor
780 {
781 public:
CAsioSocketServerImpl(const amuleIPV4Address & adr,CLibSocketServer * libSocketServer)782 	CAsioSocketServerImpl(const amuleIPV4Address & adr, CLibSocketServer * libSocketServer)
783 		: ip::tcp::acceptor(s_io_service),
784 		  m_libSocketServer(libSocketServer),
785 		  m_currentSocket(NULL),
786 		  m_strand(s_io_service)
787 	{
788 		m_ok = false;
789 		m_socketAvailable = false;
790 
791 		try {
792 			open(adr.GetEndpoint().protocol());
793 			set_option(ip::tcp::acceptor::reuse_address(true));
794 			bind(adr.GetEndpoint());
795 			listen();
796 			StartAccept();
797 			m_ok = true;
798 			AddDebugLogLineN(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d")) % adr.IPAddress() % adr.Service());
799 		} catch (const system_error& err) {
800 			AddDebugLogLineC(logAsio, CFormat(wxT("CAsioSocketServerImpl bind to %s %d failed - %s")) % adr.IPAddress() % adr.Service() % err.code().message());
801 		}
802 	}
803 
~CAsioSocketServerImpl()804 	~CAsioSocketServerImpl()
805 	{
806 	}
807 
808 	// For wxSocketServer, Ok will return true if the server could bind to the specified address and is already listening for new connections.
IsOk() const809 	bool IsOk() const { return m_ok; }
810 
Close()811 	void Close() { close();	}
812 
AcceptWith(CLibSocket & socket)813 	bool AcceptWith(CLibSocket & socket)
814 	{
815 		if (!m_socketAvailable) {
816 			AddDebugLogLineF(logAsio, wxT("AcceptWith: nothing there"));
817 			return false;
818 		}
819 
820 		// return the socket we received
821 		socket.LinkSocketImpl(m_currentSocket.release());
822 
823 		// check if we have another socket ready for reception
824 		m_currentSocket.reset(new CAsioSocketImpl(NULL));
825 		error_code ec;
826 		// async_accept does not work if server is non-blocking
827 		// temporarily switch it to non-blocking
828 		non_blocking(true);
829 		// we are set to non-blocking, so this returns right away
830 		accept(m_currentSocket->GetAsioSocket(), ec);
831 		// back to blocking
832 		non_blocking(false);
833 		if (ec || !m_currentSocket->UpdateIP()) {
834 			// nothing there
835 			m_socketAvailable = false;
836 			// start getting another one
837 			StartAccept();
838 			AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, getting another socket in background"));
839 		} else {
840 			// we got another socket right away
841 			m_socketAvailable = true;	// it is already true, but this improves readability
842 			AddDebugLogLineF(logAsio, wxT("AcceptWith: ok, another socket is available"));
843 			// aMule actually doesn't need a notification as it polls the listen socket.
844 			// amuleweb does need it though
845 			CoreNotify_ServerTCPAccept(m_libSocketServer);
846 		}
847 
848 		return true;
849 	}
850 
SocketAvailable() const851 	bool SocketAvailable() const { return m_socketAvailable; }
852 
853 private:
854 
StartAccept()855 	void StartAccept()
856 	{
857 		m_currentSocket.reset(new CAsioSocketImpl(NULL));
858 		async_accept(m_currentSocket->GetAsioSocket(),
859 			m_strand.wrap(boost::bind(& CAsioSocketServerImpl::HandleAccept, this, placeholders::error)));
860 	}
861 
HandleAccept(const error_code & error)862 	void HandleAccept(const error_code& error)
863 	{
864 		if (error) {
865 			AddDebugLogLineC(logAsio, CFormat(wxT("Error in HandleAccept: %s")) % error.message());
866 		} else {
867 			if (m_currentSocket->UpdateIP()) {
868 				AddDebugLogLineN(logAsio, CFormat(wxT("HandleAccept received a connection from %s:%d"))
869 					% m_currentSocket->GetIP() % m_currentSocket->GetPort());
870 				m_socketAvailable = true;
871 				CoreNotify_ServerTCPAccept(m_libSocketServer);
872 				return;
873 			} else {
874 				AddDebugLogLineN(logAsio, wxT("Error in HandleAccept: invalid socket"));
875 			}
876 		}
877 		// We were not successful. Try again.
878 		// Post the request to the event queue to make sure it doesn't get called immediately.
879 		m_strand.post(boost::bind(& CAsioSocketServerImpl::StartAccept, this));
880 	}
881 
882 	// The wrapper object
883 	CLibSocketServer * m_libSocketServer;
884 	// Startup ok
885 	bool m_ok;
886 	// The last socket that connected to us
887 	CScopedPtr<CAsioSocketImpl> m_currentSocket;
888 	// Is there a socket available?
889 	bool m_socketAvailable;
890 	io_service::strand	m_strand;		// handle synchronisation in io_service thread pool
891 };
892 
893 
CLibSocketServer(const amuleIPV4Address & adr,int)894 CLibSocketServer::CLibSocketServer(const amuleIPV4Address& adr, int /* flags */)
895 {
896 	m_aServer = new CAsioSocketServerImpl(adr, this);
897 }
898 
899 
~CLibSocketServer()900 CLibSocketServer::~CLibSocketServer()
901 {
902 	delete m_aServer;
903 }
904 
905 
906 // Accepts an incoming connection request, and creates a new CLibSocket object which represents the server-side of the connection.
907 // Only used in CamuleApp::ListenSocketHandler() and we don't get there.
Accept(bool)908 CLibSocket * CLibSocketServer::Accept(bool /* wait */)
909 {
910 	wxFAIL;
911 	return NULL;
912 }
913 
914 
915 // Accept an incoming connection using the specified socket object.
AcceptWith(CLibSocket & socket,bool WXUNUSED_UNLESS_DEBUG (wait))916 bool CLibSocketServer::AcceptWith(CLibSocket & socket, bool WXUNUSED_UNLESS_DEBUG(wait) )
917 {
918 	wxASSERT(!wait);
919 	return m_aServer->AcceptWith(socket);
920 }
921 
922 
IsOk() const923 bool CLibSocketServer::IsOk() const
924 {
925 	return m_aServer->IsOk();
926 }
927 
928 
Close()929 void CLibSocketServer::Close()
930 {
931 	m_aServer->Close();
932 }
933 
934 
SocketAvailable()935 bool CLibSocketServer::SocketAvailable()
936 {
937 	return m_aServer->SocketAvailable();
938 }
939 
940 
941 /**
942  * ASIO UDP socket implementation
943  */
944 
945 class CAsioUDPSocketImpl
946 {
947 private:
948 	// UDP data block
949 	class CUDPData {
950 	public:
951 		char * buffer;
952 		uint32 size;
953 		amuleIPV4Address ipadr;
954 
CUDPData(const void * src,uint32 _size,amuleIPV4Address adr)955 		CUDPData(const void * src, uint32 _size, amuleIPV4Address adr) :
956 			size(_size), ipadr(adr)
957 		{
958 			buffer = new char[size];
959 			memcpy(buffer, src, size);
960 		}
961 
~CUDPData()962 		~CUDPData()
963 		{
964 			delete[] buffer;
965 		}
966 	};
967 
968 public:
CAsioUDPSocketImpl(const amuleIPV4Address & address,int,CLibUDPSocket * libSocket)969 	CAsioUDPSocketImpl(const amuleIPV4Address &address, int /* flags */, CLibUDPSocket * libSocket) :
970 		m_libSocket(libSocket),
971 		m_strand(s_io_service),
972 		m_timer(s_io_service),
973 		m_address(address)
974 	{
975 		m_muleSocket = NULL;
976 		m_socket = NULL;
977 		m_readBuffer = new char[CMuleUDPSocket::UDP_BUFFER_SIZE];
978 		m_OK = true;
979 		CreateSocket();
980 	}
981 
~CAsioUDPSocketImpl()982 	~CAsioUDPSocketImpl()
983 	{
984 		AddDebugLogLineF(logAsio, wxT("UDP ~CAsioUDPSocketImpl"));
985 		delete m_socket;
986 		delete[] m_readBuffer;
987 		DeleteContents(m_receiveBuffers);
988 	}
989 
SetClientData(CMuleUDPSocket * muleSocket)990 	void SetClientData(CMuleUDPSocket * muleSocket)
991 	{
992 		AddDebugLogLineF(logAsio, wxT("UDP SetClientData"));
993 		m_muleSocket = muleSocket;
994 	}
995 
RecvFrom(amuleIPV4Address & addr,void * buf,uint32 nBytes)996 	uint32 RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
997 	{
998 		CUDPData * recdata;
999 		{
1000 			wxMutexLocker lock(m_receiveBuffersLock);
1001 			if (m_receiveBuffers.empty()) {
1002 				AddDebugLogLineN(logAsio, wxT("UDP RecvFromError no data"));
1003 				return 0;
1004 			}
1005 			recdata = * m_receiveBuffers.begin();
1006 			m_receiveBuffers.pop_front();
1007 		}
1008 		uint32 read = recdata->size;
1009 		if (read > nBytes) {
1010 			// should not happen
1011 			AddDebugLogLineN(logAsio, CFormat(wxT("UDP RecvFromError too much data %d")) % read);
1012 			read = nBytes;
1013 		}
1014 		memcpy(buf, recdata->buffer, read);
1015 		addr = recdata->ipadr;
1016 		delete recdata;
1017 		return read;
1018 	}
1019 
SendTo(const amuleIPV4Address & addr,const void * buf,uint32 nBytes)1020 	uint32 SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1021 	{
1022 		// Collect data, make a copy of the buffer's content
1023 		CUDPData * recdata = new CUDPData(buf, nBytes, addr);
1024 		AddDebugLogLineF(logAsio, CFormat(wxT("UDP SendTo %d to %s")) % nBytes % addr.IPAddress());
1025 		m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchSendTo, this, recdata));
1026 		return nBytes;
1027 	}
1028 
IsOk() const1029 	bool IsOk() const
1030 	{
1031 		return m_OK;
1032 	}
1033 
Close()1034 	void Close()
1035 	{
1036 		if (s_io_service.stopped()) {
1037 			DispatchClose();
1038 		} else {
1039 			m_strand.dispatch(boost::bind(& CAsioUDPSocketImpl::DispatchClose, this));
1040 		}
1041 	}
1042 
Destroy()1043 	void Destroy()
1044 	{
1045 		AddDebugLogLineF(logAsio, CFormat(wxT("Destroy() %p %p")) % m_libSocket % this);
1046 		Close();
1047 		if (s_io_service.stopped()) {
1048 			HandleDestroy();
1049 		} else {
1050 			// Close prevents creation of any more callbacks, but does not clear any callbacks already
1051 			// sitting in Asio's event queue (I have seen such a crash).
1052 			// So create a delay timer so they can be called until core is notified.
1053 			m_timer.expires_from_now(boost::posix_time::seconds(1));
1054 			m_timer.async_wait(m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleDestroy, this)));
1055 		}
1056 	}
1057 
1058 
1059 private:
1060 	//
1061 	// Dispatch handlers
1062 	// Access to m_socket is all bundled in the thread running s_io_service to avoid
1063 	// concurrent access to the socket from several threads.
1064 	// So once things are running (after connect), all access goes through one of these handlers.
1065 	//
DispatchClose()1066 	void DispatchClose()
1067 	{
1068 		error_code ec;
1069 		m_socket->close(ec);
1070 		if (ec) {
1071 			AddDebugLogLineC(logAsio, CFormat(wxT("UDP Close error %s")) % ec.message());
1072 		} else {
1073 			AddDebugLogLineF(logAsio, wxT("UDP Closed"));
1074 		}
1075 	}
1076 
DispatchSendTo(CUDPData * recdata)1077 	void DispatchSendTo(CUDPData * recdata)
1078 	{
1079 		ip::udp::endpoint endpoint(recdata->ipadr.GetEndpoint().address(), recdata->ipadr.Service());
1080 
1081 		AddDebugLogLineF(logAsio, CFormat(wxT("UDP DispatchSendTo %d to %s:%d")) % recdata->size
1082 			% endpoint.address().to_string() % endpoint.port());
1083 		m_socket->async_send_to(buffer(recdata->buffer, recdata->size), endpoint,
1084 			m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleSendTo, this, placeholders::error, placeholders::bytes_transferred, recdata)));
1085 	}
1086 
1087 	//
1088 	// Completion handlers for async requests
1089 	//
1090 
HandleRead(const error_code & ec,size_t received)1091 	void HandleRead(const error_code & ec, size_t received)
1092 	{
1093 		if (ec) {
1094 			AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleReadError %s")) % ec.message());
1095 		} else if (received == 0) {
1096 			AddDebugLogLineF(logAsio, wxT("UDP HandleReadError nothing available"));
1097 		} else if (m_muleSocket == NULL) {
1098 			AddDebugLogLineN(logAsio, wxT("UDP HandleReadError no handler"));
1099 		} else {
1100 
1101 			amuleIPV4Address ipadr = amuleIPV4Address(CamuleIPV4Endpoint(m_receiveEndpoint));
1102 			AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleRead %d %s:%d")) % received % ipadr.IPAddress() % ipadr.Service());
1103 
1104 			// create our read buffer
1105 			CUDPData * recdata = new CUDPData(m_readBuffer, received, ipadr);
1106 			{
1107 				wxMutexLocker lock(m_receiveBuffersLock);
1108 				m_receiveBuffers.push_back(recdata);
1109 			}
1110 			CoreNotify_UDPSocketReceive(m_muleSocket);
1111 		}
1112 		StartBackgroundRead();
1113 	}
1114 
HandleSendTo(const error_code & ec,size_t sent,CUDPData * recdata)1115 	void HandleSendTo(const error_code & ec, size_t sent, CUDPData * recdata)
1116 	{
1117 		if (ec) {
1118 			AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError %s")) % ec.message());
1119 		} else if (sent != recdata->size) {
1120 			AddDebugLogLineN(logAsio, CFormat(wxT("UDP HandleSendToError tosend: %d sent %d")) % recdata->size % sent);
1121 		}
1122 		if (m_muleSocket == NULL) {
1123 			AddDebugLogLineN(logAsio, wxT("UDP HandleSendToError no handler"));
1124 		} else {
1125 			AddDebugLogLineF(logAsio, CFormat(wxT("UDP HandleSendTo %d to %s")) % sent % recdata->ipadr.IPAddress());
1126 			CoreNotify_UDPSocketSend(m_muleSocket);
1127 		}
1128 		delete recdata;
1129 	}
1130 
HandleDestroy()1131 	void HandleDestroy()
1132 	{
1133 		AddDebugLogLineF(logAsio, CFormat(wxT("HandleDestroy() %p %p")) % m_libSocket % this);
1134 		delete m_libSocket;
1135 	}
1136 
1137 	//
1138 	// Other functions
1139 	//
1140 
CreateSocket()1141 	void CreateSocket()
1142 	{
1143 		try {
1144 			delete m_socket;
1145 			ip::udp::endpoint endpoint(m_address.GetEndpoint().address(), m_address.Service());
1146 			m_socket = new ip::udp::socket(s_io_service, endpoint);
1147 			AddDebugLogLineN(logAsio, CFormat(wxT("Created UDP socket %s %d")) % m_address.IPAddress() % m_address.Service());
1148 			StartBackgroundRead();
1149 		} catch (const system_error& err) {
1150 			AddLogLineC(CFormat(wxT("Error creating UDP socket %s %d : %s")) % m_address.IPAddress() % m_address.Service() % err.code().message());
1151 			m_socket = NULL;
1152 			m_OK = false;
1153 		}
1154 	}
1155 
StartBackgroundRead()1156 	void StartBackgroundRead()
1157 	{
1158 		m_socket->async_receive_from(buffer(m_readBuffer, CMuleUDPSocket::UDP_BUFFER_SIZE), m_receiveEndpoint,
1159 			m_strand.wrap(boost::bind(& CAsioUDPSocketImpl::HandleRead, this, placeholders::error, placeholders::bytes_transferred)));
1160 	}
1161 
1162 	CLibUDPSocket *		m_libSocket;
1163 	ip::udp::socket *	m_socket;
1164 	CMuleUDPSocket *	m_muleSocket;
1165 	bool				m_OK;
1166 	io_service::strand	m_strand;		// handle synchronisation in io_service thread pool
1167 	deadline_timer		m_timer;
1168 	amuleIPV4Address	m_address;
1169 
1170 	// One fix receive buffer
1171 	char *				m_readBuffer;
1172 	// and a list of dynamic buffers. UDP data may be coming in faster
1173 	// than the main loop can handle it.
1174 	std::list<CUDPData *>	m_receiveBuffers;
1175 	wxMutex				m_receiveBuffersLock;
1176 
1177 	// Address of last reception
1178 	ip::udp::endpoint	m_receiveEndpoint;
1179 };
1180 
1181 
1182 /**
1183  * Library UDP socket wrapper
1184  */
1185 
CLibUDPSocket(amuleIPV4Address & address,int flags)1186 CLibUDPSocket::CLibUDPSocket(amuleIPV4Address &address, int flags)
1187 {
1188 	m_aSocket = new CAsioUDPSocketImpl(address, flags, this);
1189 }
1190 
1191 
~CLibUDPSocket()1192 CLibUDPSocket::~CLibUDPSocket()
1193 {
1194 	AddDebugLogLineF(logAsio, CFormat(wxT("~CLibUDPSocket() %p %p")) % this % m_aSocket);
1195 	delete m_aSocket;
1196 }
1197 
1198 
IsOk() const1199 bool CLibUDPSocket::IsOk() const
1200 {
1201 	return m_aSocket->IsOk();
1202 }
1203 
1204 
RecvFrom(amuleIPV4Address & addr,void * buf,uint32 nBytes)1205 uint32 CLibUDPSocket::RecvFrom(amuleIPV4Address& addr, void* buf, uint32 nBytes)
1206 {
1207 	return m_aSocket->RecvFrom(addr, buf, nBytes);
1208 }
1209 
1210 
SendTo(const amuleIPV4Address & addr,const void * buf,uint32 nBytes)1211 uint32 CLibUDPSocket::SendTo(const amuleIPV4Address& addr, const void* buf, uint32 nBytes)
1212 {
1213 	return m_aSocket->SendTo(addr, buf, nBytes);
1214 }
1215 
1216 
SetClientData(CMuleUDPSocket * muleSocket)1217 void CLibUDPSocket::SetClientData(CMuleUDPSocket * muleSocket)
1218 {
1219 	m_aSocket->SetClientData(muleSocket);
1220 }
1221 
1222 
LastError() const1223 int CLibUDPSocket::LastError() const
1224 {
1225 	return !IsOk();
1226 }
1227 
1228 
Close()1229 void CLibUDPSocket::Close()
1230 {
1231 	m_aSocket->Close();
1232 }
1233 
1234 
Destroy()1235 void CLibUDPSocket::Destroy()
1236 {
1237 	m_aSocket->Destroy();
1238 }
1239 
1240 
1241 /**
1242  * CAsioService - ASIO event loop thread
1243  */
1244 
1245 class CAsioServiceThread : public wxThread {
1246 public:
CAsioServiceThread()1247 	CAsioServiceThread() : wxThread(wxTHREAD_JOINABLE)
1248 	{
1249 		static int count = 0;
1250 		m_threadNumber = ++count;
1251 		Create();
1252 		Run();
1253 	}
1254 
Entry()1255 	void * Entry()
1256 	{
1257 		AddLogLineNS(CFormat(_("Asio thread %d started")) % m_threadNumber);
1258 		io_service::work worker(s_io_service);		// keep io_service running
1259 		s_io_service.run();
1260 		AddDebugLogLineN(logAsio, CFormat(wxT("Asio thread %d stopped")) % m_threadNumber);
1261 
1262 		return NULL;
1263 	}
1264 
1265 private:
1266 	int m_threadNumber;
1267 };
1268 
1269 /**
1270  * The constructor starts the thread.
1271  */
CAsioService()1272 CAsioService::CAsioService()
1273 {
1274 	m_threads = new CAsioServiceThread[m_numberOfThreads];
1275 }
1276 
1277 
~CAsioService()1278 CAsioService::~CAsioService()
1279 {
1280 }
1281 
1282 
Stop()1283 void CAsioService::Stop()
1284 {
1285 	if (!m_threads) {
1286 		return;
1287 	}
1288 	s_io_service.stop();
1289 	// Wait for threads to exit
1290 	for (int i = 0; i < m_numberOfThreads; i++) {
1291 		CAsioServiceThread * t = m_threads + i;
1292 		t->Wait();
1293 	}
1294 	delete[] m_threads;
1295 	m_threads = 0;
1296 }
1297 
1298 
1299 
1300 
1301 
1302 /**
1303  * amuleIPV4Address
1304  */
1305 
amuleIPV4Address()1306 amuleIPV4Address::amuleIPV4Address()
1307 {
1308 	m_endpoint = new CamuleIPV4Endpoint();
1309 }
1310 
amuleIPV4Address(const amuleIPV4Address & a)1311 amuleIPV4Address::amuleIPV4Address(const amuleIPV4Address &a)
1312 {
1313 	*this = a;
1314 }
1315 
amuleIPV4Address(const CamuleIPV4Endpoint & ep)1316 amuleIPV4Address::amuleIPV4Address(const CamuleIPV4Endpoint &ep)
1317 {
1318 	*this = ep;
1319 }
1320 
~amuleIPV4Address()1321 amuleIPV4Address::~amuleIPV4Address()
1322 {
1323 	delete m_endpoint;
1324 }
1325 
operator =(const amuleIPV4Address & a)1326 amuleIPV4Address& amuleIPV4Address::operator=(const amuleIPV4Address &a)
1327 {
1328 	m_endpoint = new CamuleIPV4Endpoint(* a.m_endpoint);
1329 	return *this;
1330 }
1331 
operator =(const CamuleIPV4Endpoint & ep)1332 amuleIPV4Address& amuleIPV4Address::operator=(const CamuleIPV4Endpoint &ep)
1333 {
1334 	m_endpoint = new CamuleIPV4Endpoint(ep);
1335 	return *this;
1336 }
1337 
Hostname(const wxString & name)1338 bool amuleIPV4Address::Hostname(const wxString& name)
1339 {
1340 	if (name.IsEmpty()) {
1341 		return false;
1342 	}
1343 	// This is usually just an IP.
1344 	std::string sname(unicode2char(name));
1345 	error_code ec;
1346 	ip::address_v4 adr = ip::address_v4::from_string(sname, ec);
1347 	if (!ec) {
1348 		m_endpoint->address(adr);
1349 		return true;
1350 	}
1351 	AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") failed, not an IP address %s")) % name % ec.message());
1352 
1353 	// Try to resolve (sync). Normally not required. Unless you type in your hostname as "local IP address" or something.
1354 	error_code ec2;
1355 	ip::tcp::resolver res(s_io_service);
1356 	// We only want to get IPV4 addresses.
1357 	ip::tcp::resolver::query query(ip::tcp::v4(), sname, "");
1358 	ip::tcp::resolver::iterator endpoint_iterator = res.resolve(query, ec2);
1359 	if (ec2) {
1360 		AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: %s")) % name % ec2.message());
1361 		return false;
1362 	}
1363 	if (endpoint_iterator == ip::tcp::resolver::iterator()) {
1364 		AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolve failed: no address found")) % name);
1365 		return false;
1366 	}
1367 	m_endpoint->address(endpoint_iterator->endpoint().address());
1368 	AddDebugLogLineN(logAsio, CFormat(wxT("Hostname(\"%s\") resolved to %s")) % name % IPAddress());
1369 	return true;
1370 }
1371 
Service(uint16 service)1372 bool amuleIPV4Address::Service(uint16 service)
1373 {
1374 	if (service == 0) {
1375 		return false;
1376 	}
1377 	m_endpoint->port(service);
1378 	return true;
1379 }
1380 
Service() const1381 uint16 amuleIPV4Address::Service() const
1382 {
1383 	return m_endpoint->port();
1384 }
1385 
IsLocalHost() const1386 bool amuleIPV4Address::IsLocalHost() const
1387 {
1388 	return m_endpoint->address().is_loopback();
1389 }
1390 
IPAddress() const1391 wxString amuleIPV4Address::IPAddress() const
1392 {
1393 	return CFormat(wxT("%s")) % m_endpoint->address().to_string();
1394 }
1395 
1396 // "Set address to any of the addresses of the current machine."
1397 // This just sets the address to 0.0.0.0 .
1398 // wx does the same.
AnyAddress()1399 bool amuleIPV4Address::AnyAddress()
1400 {
1401 	m_endpoint->address(ip::address_v4::any());
1402 	AddDebugLogLineN(logAsio, CFormat(wxT("AnyAddress: set to %s")) % IPAddress());
1403 	return true;
1404 }
1405 
GetEndpoint() const1406 const CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint() const
1407 {
1408 	return * m_endpoint;
1409 }
1410 
GetEndpoint()1411 CamuleIPV4Endpoint & amuleIPV4Address::GetEndpoint()
1412 {
1413 	return * m_endpoint;
1414 }
1415 
1416 
1417 //
1418 // Notification stuff
1419 //
1420 namespace MuleNotify
1421 {
1422 
LibSocketConnect(CLibSocket * socket,int error)1423 	void LibSocketConnect(CLibSocket * socket, int error)
1424 	{
1425 		if (socket->IsDestroying()) {
1426 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Destroying %s %d")) % socket->GetIP() % error);
1427 		} else if (socket->GetProxyState()) {
1428 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect Proxy %s %d")) % socket->GetIP() % error);
1429 			socket->OnProxyEvent(MULE_SOCKET_CONNECTION);
1430 		} else {
1431 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketConnect %s %d")) %socket->GetIP() % error);
1432 			socket->OnConnect(error);
1433 		}
1434 	}
1435 
LibSocketSend(CLibSocket * socket,int error)1436 	void LibSocketSend(CLibSocket * socket, int error)
1437 	{
1438 		if (socket->IsDestroying()) {
1439 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Destroying %s %d")) % socket->GetIP() % error);
1440 		} else if (socket->GetProxyState()) {
1441 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend Proxy %s %d")) % socket->GetIP() % error);
1442 			socket->OnProxyEvent(MULE_SOCKET_OUTPUT);
1443 		} else {
1444 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketSend %s %d")) % socket->GetIP() % error);
1445 			socket->OnSend(error);
1446 		}
1447 	}
1448 
LibSocketReceive(CLibSocket * socket,int error)1449 	void LibSocketReceive(CLibSocket * socket, int error)
1450 	{
1451 		socket->EventProcessed();
1452 		if (socket->IsDestroying()) {
1453 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Destroying %s %d")) % socket->GetIP() % error);
1454 		} else if (socket->GetProxyState()) {
1455 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive Proxy %s %d")) % socket->GetIP() % error);
1456 			socket->OnProxyEvent(MULE_SOCKET_INPUT);
1457 		} else {
1458 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketReceive %s %d")) % socket->GetIP() % error);
1459 			socket->OnReceive(error);
1460 		}
1461 	}
1462 
LibSocketLost(CLibSocket * socket)1463 	void LibSocketLost(CLibSocket * socket)
1464 	{
1465 		if (socket->IsDestroying()) {
1466 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Destroying %s")) % socket->GetIP());
1467 		} else if (socket->GetProxyState()) {
1468 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost Proxy %s")) % socket->GetIP());
1469 			socket->OnProxyEvent(MULE_SOCKET_LOST);
1470 		} else {
1471 			AddDebugLogLineF(logAsio, CFormat(wxT("LibSocketLost %s")) % socket->GetIP());
1472 			socket->OnLost();
1473 		}
1474 	}
1475 
LibSocketDestroy(CLibSocket * socket)1476 	void LibSocketDestroy(CLibSocket * socket)
1477 	{
1478 		AddDebugLogLineF(logAsio, CFormat(wxT("LibSocket_Destroy %s")) % socket->GetIP());
1479 		delete socket;
1480 	}
1481 
ProxySocketEvent(CLibSocket * socket,int evt)1482 	void ProxySocketEvent(CLibSocket * socket, int evt)
1483 	{
1484 		AddDebugLogLineF(logAsio, CFormat(wxT("ProxySocketEvent %s %d")) % socket->GetIP() % evt);
1485 		socket->OnProxyEvent(evt);
1486 	}
1487 
ServerTCPAccept(CLibSocketServer * socketServer)1488 	void ServerTCPAccept(CLibSocketServer * socketServer)
1489 	{
1490 		AddDebugLogLineF(logAsio, wxT("ServerTCP_Accept"));
1491 		socketServer->OnAccept();
1492 	}
1493 
UDPSocketSend(CMuleUDPSocket * socket)1494 	void UDPSocketSend(CMuleUDPSocket * socket)
1495 	{
1496 		AddDebugLogLineF(logAsio, wxT("UDPSocketSend"));
1497 		socket->OnSend(0);
1498 	}
1499 
UDPSocketReceive(CMuleUDPSocket * socket)1500 	void UDPSocketReceive(CMuleUDPSocket * socket)
1501 	{
1502 		AddDebugLogLineF(logAsio, wxT("UDPSocketReceive"));
1503 		socket->OnReceive(0);
1504 	}
1505 
1506 
1507 } // namespace MuleNotify
1508 
1509 //
1510 // Initialize MuleBoostVersion
1511 //
1512 wxString MuleBoostVersion = CFormat(wxT("%d.%d")) % (BOOST_VERSION / 100000) % (BOOST_VERSION / 100 % 1000);
1513