1 /* 2 * Copyright (C) 2001-2012 Jacek Sieka, arnetheduck on gmail point com 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2 of the License, or 7 * (at your option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU General Public License 15 * along with this program; if not, write to the Free Software 16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 17 */ 18 19 #pragma once 20 21 #include "typedefs.h" 22 #include "BufferedSocketListener.h" 23 #include "Semaphore.h" 24 #include "Thread.h" 25 #include "Speaker.h" 26 #include "Util.h" 27 #include "Socket.h" 28 #include "Atomic.h" 29 30 namespace dcpp { 31 32 class BufferedSocket : public Speaker<BufferedSocketListener>, private Thread { 33 public: 34 enum Modes { 35 MODE_LINE, 36 MODE_ZPIPE, 37 MODE_DATA 38 }; 39 40 enum NatRoles { 41 NAT_NONE, 42 NAT_CLIENT, 43 NAT_SERVER 44 }; 45 46 /** 47 * BufferedSocket factory, each BufferedSocket may only be used to create one connection 48 * @param sep Line separator 49 * @return An unconnected socket 50 */ getSocket(char sep)51 static BufferedSocket* getSocket(char sep) { 52 return new BufferedSocket(sep); 53 } 54 putSocket(BufferedSocket * aSock)55 static void putSocket(BufferedSocket* aSock) { 56 if(aSock) { 57 aSock->removeListeners(); 58 aSock->shutdown(); 59 } 60 } 61 waitShutdown()62 static void waitShutdown() { 63 while(sockets > 0) 64 Thread::sleep(100); 65 } 66 67 void accept(const Socket& srv, bool secure, bool allowUntrusted); 68 void connect(const string& aAddress, uint16_t aPort, bool secure, bool allowUntrusted, bool proxy); 69 void connect(const string& aAddress, uint16_t aPort, uint16_t localPort, NatRoles natRole, bool secure, bool allowUntrusted, bool proxy); 70 71 /** Sets data mode for aBytes bytes. Must be called within onLine. */ 72 void setDataMode(int64_t aBytes = -1) { mode = MODE_DATA; dataBytes = aBytes; } 73 /** 74 * Rollback is an ugly hack to solve problems with compressed transfers where not all data received 75 * should be treated as data. 76 * Must be called from within onData. 77 */ setLineMode(size_t aRollback)78 void setLineMode(size_t aRollback) { setMode (MODE_LINE, aRollback);} 79 void setMode(Modes mode, size_t aRollback = 0); getMode()80 Modes getMode() const { return mode; } getIp()81 const string& getIp() const { return sock->getIp(); } isConnected()82 bool isConnected() const { return sock->isConnected(); } 83 isSecure()84 bool isSecure() const { return sock->isSecure(); } isTrusted()85 bool isTrusted() const { return sock->isTrusted(); } getCipherName()86 std::string getCipherName() const { return sock->getCipherName(); } getKeyprint()87 vector<uint8_t> getKeyprint() const { return sock->getKeyprint(); } 88 write(const string & aData)89 void write(const string& aData) { write(aData.data(), aData.length()); } 90 void write(const char* aBuf, size_t aLen) noexcept; 91 /** Send the file f over this socket. */ transmitFile(InputStream * f)92 void transmitFile(InputStream* f) { Lock l(cs); addTask(SEND_FILE, new SendFileInfo(f)); } 93 94 /** Send an updated signal to all listeners */ updated()95 void updated() { Lock l(cs); addTask(UPDATED, 0); } 96 97 void disconnect(bool graceless = false) noexcept { Lock l(cs); if(graceless) disconnecting = true; addTask(DISCONNECT, 0); } 98 getLocalIp()99 string getLocalIp() const { return sock->getLocalIp(); } getLocalPort()100 uint16_t getLocalPort() const { return sock->getLocalPort(); } 101 102 GETSET(char, separator, Separator) 103 private: 104 enum Tasks { 105 CONNECT, 106 DISCONNECT, 107 SEND_DATA, 108 SEND_FILE, 109 SHUTDOWN, 110 ACCEPTED, 111 UPDATED 112 }; 113 114 enum State { 115 STARTING, // Waiting for CONNECT/ACCEPTED/SHUTDOWN 116 RUNNING, 117 FAILED 118 }; 119 120 struct TaskData { ~TaskDataTaskData121 virtual ~TaskData() { } 122 }; 123 struct ConnectInfo : public TaskData { ConnectInfoConnectInfo124 ConnectInfo(string addr_, uint16_t port_, uint16_t localPort_, NatRoles natRole_, bool proxy_) : addr(addr_), port(port_), localPort(localPort_), natRole(natRole_), proxy(proxy_) { } 125 string addr; 126 uint16_t port; 127 uint16_t localPort; 128 NatRoles natRole; 129 bool proxy; 130 }; 131 struct SendFileInfo : public TaskData { SendFileInfoSendFileInfo132 SendFileInfo(InputStream* stream_) : stream(stream_) { } 133 InputStream* stream; 134 }; 135 136 BufferedSocket(char aSeparator); 137 138 virtual ~BufferedSocket(); 139 140 CriticalSection cs; 141 142 Semaphore taskSem; 143 deque<pair<Tasks, unique_ptr<TaskData> > > tasks; 144 145 Modes mode; 146 std::unique_ptr<UnZFilter> filterIn; 147 int64_t dataBytes; 148 size_t rollback; 149 string line; 150 ByteVector inbuf; 151 ByteVector writeBuf; 152 ByteVector sendBuf; 153 154 std::unique_ptr<Socket> sock; 155 State state; 156 bool disconnecting; 157 158 virtual int run(); 159 160 void threadConnect(const string& aAddr, uint16_t aPort, uint16_t localPort, NatRoles natRole, bool proxy); 161 void threadAccept(); 162 void threadRead(); 163 void threadSendFile(InputStream* is); 164 void threadSendData(); 165 166 void fail(const string& aError); 167 static Atomic<long,memory_ordering_strong> sockets; 168 169 bool checkEvents(); 170 void checkSocket(); 171 172 void setSocket(std::unique_ptr<Socket> s); 173 void shutdown(); 174 void addTask(Tasks task, TaskData* data); 175 }; 176 177 } // namespace dcpp 178