1 /* Copyright (C) 2007,2008 by Marc Maurer <uwog@uwog.net>
2  *
3  * This program is free software; you can redistribute it and/or
4  * modify it under the terms of the GNU General Public License
5  * as published by the Free Software Foundation; either version 2
6  * of the License, or (at your option) any later version.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11  * GNU General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public License
14  * along with this program; if not, write to the Free Software
15  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
16  * 02110-1301 USA.
17  */
18 
19 #ifndef __SESSION__
20 #define __SESSION__
21 
22 #include <boost/function.hpp>
23 #include <boost/bind.hpp>
24 #include <boost/enable_shared_from_this.hpp>
25 #include <boost/noncopyable.hpp>
26 #include <deque>
27 #include <sync/xp/lock.h>
28 #include <sync/xp/Synchronizer.h>
29 
30 // 64MB seems reasonable enough for now...
31 #define MAX_PACKET_DATA_SIZE 64*1024*1024
32 
33 class TCPAccountHandler;
34 
35 class Session : public Synchronizer, public asio::noncopyable, public boost::enable_shared_from_this<Session>
36 {
37 public:
Session(asio::io_service & io_service,boost::function<void (boost::shared_ptr<Session>)> ef)38 	Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef)
39 		: Synchronizer(boost::bind(&Session::_signal, this)),
40 		socket(io_service),
41 		queue_protector(),
42 		m_ef(ef)
43 	{
44 	}
45 
connect(asio::ip::tcp::resolver::iterator & iterator)46 	void connect(asio::ip::tcp::resolver::iterator& iterator)
47 	{
48 		socket.connect(*iterator);
49 	}
50 
51 	// TODO: don't expose this
getSocket()52 	asio::ip::tcp::socket& getSocket()
53 	{
54 		return socket;
55 	}
56 
getRemoteAddress()57 	std::string getRemoteAddress()
58 	{
59 		return socket.remote_endpoint().address().to_string();
60 	}
61 
getRemotePort()62 	unsigned short getRemotePort()
63 	{
64 		return socket.remote_endpoint().port();
65 	}
66 
push(int size,char * data)67 	void push(int size, char* data)
68 	{
69 		{
70 			abicollab::scoped_lock lock(queue_protector);
71 			incoming.push_back( std::pair<int, char*>(size, data) );
72 		}
73 		Synchronizer::signal();
74 	}
75 
76 	/*
77 		Only called fron the abiword main loop
78 	*/
pop(int & size,char ** data)79 	bool pop(int& size, char** data)
80 	{
81 		if (incoming.size() == 0)
82 			return false;
83 		{
84 			abicollab::scoped_lock lock(queue_protector);
85 			std::pair<int, char*> p = incoming.front();
86 			size = p.first;
87 			*data = p.second;
88 			incoming.pop_front();
89 		}
90 		return true;
91 	}
92 
asyncReadHeader()93 	void asyncReadHeader()
94 	{
95 		UT_DEBUGMSG(("Session::asyncReadHeader()\n"));
96 		packet_data = 0; // just to be sure we'll never touch a datablock we might have read before
97 		asio::async_read(socket,
98 			asio::buffer(&packet_size, 4),
99 			boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
100 	}
101 
asyncWrite(int size,const char * data)102 	void asyncWrite(int size, const char* data)
103 	{
104 		// TODO: this is a race condition, mutex this
105 		bool writeInProgress = outgoing.size() > 0;
106 
107 		// FIXME: inefficient memory copy
108 		char* store_data = reinterpret_cast<char*>(g_malloc(size));
109 		memcpy(store_data, data, size);
110 		outgoing.push_back(std::pair<int, char*>(size, store_data));
111 
112 		if (!writeInProgress)
113 		{
114 			packet_size_write = size;
115 			packet_data_write = store_data;
116 
117 			UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
118 			asio::async_write(socket,
119 				asio::buffer(&packet_size_write, 4),
120 				boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
121 		}
122 	}
123 
124 	/*
125 		Only called fron the abiword main loop
126 	*/
isConnected()127 	bool isConnected()
128 	{
129 		return socket.is_open();
130 	}
131 
disconnect()132 	void disconnect()
133 	{
134 		UT_DEBUGMSG(("Session::disconnect()\n"));
135 		if (socket.is_open())
136 		{
137 			asio::error_code ecs;
138 			socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs);
139 			if (ecs) {
140 				UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str()));
141 			}
142 			asio::error_code ecc;
143 			socket.close(ecc);
144 			if (ecc) {
145 				UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str()));
146 			}
147 		}
148 		UT_DEBUGMSG(("Socket closed, signalling mainloop\n"));
149 		signal();
150 	}
151 
152 private:
_signal()153 	void _signal()
154 	{
155 		UT_DEBUGMSG(("Session::_signal()\n"));
156 		m_ef(shared_from_this());
157 	}
158 
asyncReadHeaderHandler(const asio::error_code & error,std::size_t bytes_transferred)159 	void asyncReadHeaderHandler(const asio::error_code& error,
160 		std::size_t bytes_transferred)
161 	{
162 		if (error)
163 		{
164 			UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str()));
165 			disconnect();
166 			return;
167 		}
168 
169 		if (bytes_transferred != 4)
170 		{
171 			UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
172 			disconnect(); // TODO: should not happen, handle this
173 			return;
174 		}
175 
176 		if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE)
177 		{
178 			UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE));
179 			disconnect();
180 			return;
181 		}
182 
183 		UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size));
184 		// now continue reading the packet data
185 		packet_data = reinterpret_cast<char*>(g_malloc(packet_size));
186 		asio::async_read(socket,
187 			asio::buffer(packet_data, packet_size),
188 			boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred));
189 	}
190 
asyncReadHandler(const asio::error_code & error,std::size_t bytes_transferred)191 	void asyncReadHandler(const asio::error_code& error,
192 		std::size_t bytes_transferred)
193 	{
194 		if (error)
195 		{
196 			UT_DEBUGMSG(("asyncReadHandler generic error\n"));
197 			disconnect();
198 			return;
199 		}
200 
201 		if (bytes_transferred != std::size_t(packet_size))
202 		{
203 			UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN);
204 			disconnect(); // TODO: should not happen, handle this
205 			return;
206 		}
207 
208 		push(packet_size, packet_data);
209 		// start over for a new packet
210 		asyncReadHeader();
211 	}
212 
asyncWriteHeaderHandler(const asio::error_code & ec)213 	void asyncWriteHeaderHandler(const asio::error_code& ec)
214 	{
215 		UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n"));
216 		if (ec)
217 		{
218 			UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n"));
219 			disconnect();
220 			return;
221 		}
222 
223 		// write the packet body
224 		asio::async_write(socket,
225 			asio::buffer(packet_data_write, packet_size_write),
226 			boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error));
227 	}
228 
asyncWriteHandler(const asio::error_code & ec)229 	void asyncWriteHandler(const asio::error_code& ec)
230 	{
231 		UT_DEBUGMSG(("Session::asyncWriteHandler()\n"));
232 		FREEP(packet_data_write);
233 		if (ec)
234 		{
235 			UT_DEBUGMSG(("asyncWriteHandler generic error\n"));
236 			disconnect();
237 			return;
238 		}
239 
240 		// TODO: this is a race condition, mutex this
241 		outgoing.pop_front();
242 		if (outgoing.size() > 0)
243 		{
244 			std::pair<int, char*> p = outgoing.front();
245 			packet_size_write = p.first;
246 			packet_data_write = p.second;
247 
248 			UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write));
249 
250 			asio::async_write(socket,
251 				asio::buffer(&packet_size_write, 4),
252 				boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error));
253 		}
254 	}
255 
256 	asio::ip::tcp::socket					socket;
257 	abicollab::mutex 						queue_protector;
258 	std::deque< std::pair<int, char*> >		incoming;
259 	std::deque< std::pair<int, char*> >		outgoing;
260 
261 	int										packet_size; // state needed for async reads
262 	char*									packet_data; // state needed for async reads
263 
264 	int										packet_size_write; // state needed for async writes
265 	char*									packet_data_write; // state needed for async writes
266 
267 	boost::function<void (boost::shared_ptr<Session>)>		m_ef;
268 };
269 
270 #endif /* __SESSION__ */
271