1 /* Copyright (C) 2007,2008 by Marc Maurer <uwog@uwog.net> 2 * 3 * This program is free software; you can redistribute it and/or 4 * modify it under the terms of the GNU General Public License 5 * as published by the Free Software Foundation; either version 2 6 * of the License, or (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program; if not, write to the Free Software 15 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 16 * 02110-1301 USA. 17 */ 18 19 #ifndef __SESSION__ 20 #define __SESSION__ 21 22 #include <boost/function.hpp> 23 #include <boost/bind.hpp> 24 #include <boost/enable_shared_from_this.hpp> 25 #include <boost/noncopyable.hpp> 26 #include <deque> 27 #include <sync/xp/lock.h> 28 #include <sync/xp/Synchronizer.h> 29 30 // 64MB seems reasonable enough for now... 31 #define MAX_PACKET_DATA_SIZE 64*1024*1024 32 33 class TCPAccountHandler; 34 35 class Session : public Synchronizer, public asio::noncopyable, public boost::enable_shared_from_this<Session> 36 { 37 public: Session(asio::io_service & io_service,boost::function<void (boost::shared_ptr<Session>)> ef)38 Session(asio::io_service& io_service, boost::function<void (boost::shared_ptr<Session>)> ef) 39 : Synchronizer(boost::bind(&Session::_signal, this)), 40 socket(io_service), 41 queue_protector(), 42 m_ef(ef) 43 { 44 } 45 connect(asio::ip::tcp::resolver::iterator & iterator)46 void connect(asio::ip::tcp::resolver::iterator& iterator) 47 { 48 socket.connect(*iterator); 49 } 50 51 // TODO: don't expose this getSocket()52 asio::ip::tcp::socket& getSocket() 53 { 54 return socket; 55 } 56 getRemoteAddress()57 std::string getRemoteAddress() 58 { 59 return socket.remote_endpoint().address().to_string(); 60 } 61 getRemotePort()62 unsigned short getRemotePort() 63 { 64 return socket.remote_endpoint().port(); 65 } 66 push(int size,char * data)67 void push(int size, char* data) 68 { 69 { 70 abicollab::scoped_lock lock(queue_protector); 71 incoming.push_back( std::pair<int, char*>(size, data) ); 72 } 73 Synchronizer::signal(); 74 } 75 76 /* 77 Only called fron the abiword main loop 78 */ pop(int & size,char ** data)79 bool pop(int& size, char** data) 80 { 81 if (incoming.size() == 0) 82 return false; 83 { 84 abicollab::scoped_lock lock(queue_protector); 85 std::pair<int, char*> p = incoming.front(); 86 size = p.first; 87 *data = p.second; 88 incoming.pop_front(); 89 } 90 return true; 91 } 92 asyncReadHeader()93 void asyncReadHeader() 94 { 95 UT_DEBUGMSG(("Session::asyncReadHeader()\n")); 96 packet_data = 0; // just to be sure we'll never touch a datablock we might have read before 97 asio::async_read(socket, 98 asio::buffer(&packet_size, 4), 99 boost::bind(&Session::asyncReadHeaderHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred)); 100 } 101 asyncWrite(int size,const char * data)102 void asyncWrite(int size, const char* data) 103 { 104 // TODO: this is a race condition, mutex this 105 bool writeInProgress = outgoing.size() > 0; 106 107 // FIXME: inefficient memory copy 108 char* store_data = reinterpret_cast<char*>(g_malloc(size)); 109 memcpy(store_data, data, size); 110 outgoing.push_back(std::pair<int, char*>(size, store_data)); 111 112 if (!writeInProgress) 113 { 114 packet_size_write = size; 115 packet_data_write = store_data; 116 117 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write)); 118 asio::async_write(socket, 119 asio::buffer(&packet_size_write, 4), 120 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error)); 121 } 122 } 123 124 /* 125 Only called fron the abiword main loop 126 */ isConnected()127 bool isConnected() 128 { 129 return socket.is_open(); 130 } 131 disconnect()132 void disconnect() 133 { 134 UT_DEBUGMSG(("Session::disconnect()\n")); 135 if (socket.is_open()) 136 { 137 asio::error_code ecs; 138 socket.shutdown(asio::ip::tcp::socket::shutdown_both, ecs); 139 if (ecs) { 140 UT_DEBUGMSG(("Error shutting down socket: %s\n", ecs.message().c_str())); 141 } 142 asio::error_code ecc; 143 socket.close(ecc); 144 if (ecc) { 145 UT_DEBUGMSG(("Error closing socket: %s\n", ecc.message().c_str())); 146 } 147 } 148 UT_DEBUGMSG(("Socket closed, signalling mainloop\n")); 149 signal(); 150 } 151 152 private: _signal()153 void _signal() 154 { 155 UT_DEBUGMSG(("Session::_signal()\n")); 156 m_ef(shared_from_this()); 157 } 158 asyncReadHeaderHandler(const asio::error_code & error,std::size_t bytes_transferred)159 void asyncReadHeaderHandler(const asio::error_code& error, 160 std::size_t bytes_transferred) 161 { 162 if (error) 163 { 164 UT_DEBUGMSG(("asyncReadHeaderHandler error: %s\n", error.message().c_str())); 165 disconnect(); 166 return; 167 } 168 169 if (bytes_transferred != 4) 170 { 171 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN); 172 disconnect(); // TODO: should not happen, handle this 173 return; 174 } 175 176 if (packet_size < 0 || packet_size > MAX_PACKET_DATA_SIZE) 177 { 178 UT_DEBUGMSG(("Packet size (%d bytes) error - min size: 0, max size %d\n", packet_size, MAX_PACKET_DATA_SIZE)); 179 disconnect(); 180 return; 181 } 182 183 UT_DEBUGMSG(("going to read datablock of length: %d\n", packet_size)); 184 // now continue reading the packet data 185 packet_data = reinterpret_cast<char*>(g_malloc(packet_size)); 186 asio::async_read(socket, 187 asio::buffer(packet_data, packet_size), 188 boost::bind(&Session::asyncReadHandler, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred)); 189 } 190 asyncReadHandler(const asio::error_code & error,std::size_t bytes_transferred)191 void asyncReadHandler(const asio::error_code& error, 192 std::size_t bytes_transferred) 193 { 194 if (error) 195 { 196 UT_DEBUGMSG(("asyncReadHandler generic error\n")); 197 disconnect(); 198 return; 199 } 200 201 if (bytes_transferred != std::size_t(packet_size)) 202 { 203 UT_ASSERT_HARMLESS(UT_SHOULD_NOT_HAPPEN); 204 disconnect(); // TODO: should not happen, handle this 205 return; 206 } 207 208 push(packet_size, packet_data); 209 // start over for a new packet 210 asyncReadHeader(); 211 } 212 asyncWriteHeaderHandler(const asio::error_code & ec)213 void asyncWriteHeaderHandler(const asio::error_code& ec) 214 { 215 UT_DEBUGMSG(("Session::asyncWriteHeaderHandler()\n")); 216 if (ec) 217 { 218 UT_DEBUGMSG(("asyncWriteHeaderHandler generic error\n")); 219 disconnect(); 220 return; 221 } 222 223 // write the packet body 224 asio::async_write(socket, 225 asio::buffer(packet_data_write, packet_size_write), 226 boost::bind(&Session::asyncWriteHandler, shared_from_this(), asio::placeholders::error)); 227 } 228 asyncWriteHandler(const asio::error_code & ec)229 void asyncWriteHandler(const asio::error_code& ec) 230 { 231 UT_DEBUGMSG(("Session::asyncWriteHandler()\n")); 232 FREEP(packet_data_write); 233 if (ec) 234 { 235 UT_DEBUGMSG(("asyncWriteHandler generic error\n")); 236 disconnect(); 237 return; 238 } 239 240 // TODO: this is a race condition, mutex this 241 outgoing.pop_front(); 242 if (outgoing.size() > 0) 243 { 244 std::pair<int, char*> p = outgoing.front(); 245 packet_size_write = p.first; 246 packet_data_write = p.second; 247 248 UT_DEBUGMSG(("sending datablock of length: %d\n", packet_size_write)); 249 250 asio::async_write(socket, 251 asio::buffer(&packet_size_write, 4), 252 boost::bind(&Session::asyncWriteHeaderHandler, shared_from_this(), asio::placeholders::error)); 253 } 254 } 255 256 asio::ip::tcp::socket socket; 257 abicollab::mutex queue_protector; 258 std::deque< std::pair<int, char*> > incoming; 259 std::deque< std::pair<int, char*> > outgoing; 260 261 int packet_size; // state needed for async reads 262 char* packet_data; // state needed for async reads 263 264 int packet_size_write; // state needed for async writes 265 char* packet_data_write; // state needed for async writes 266 267 boost::function<void (boost::shared_ptr<Session>)> m_ef; 268 }; 269 270 #endif /* __SESSION__ */ 271