1 /*
2  * Copyright (C) 2001-2012 Jacek Sieka, arnetheduck on gmail point com
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 #pragma once
20 
21 #include "typedefs.h"
22 #include "BufferedSocketListener.h"
23 #include "Semaphore.h"
24 #include "Thread.h"
25 #include "Speaker.h"
26 #include "Util.h"
27 #include "Socket.h"
28 #include "Atomic.h"
29 
30 namespace dcpp {
31 
32 class BufferedSocket : public Speaker<BufferedSocketListener>, private Thread {
33 public:
34     enum Modes {
35         MODE_LINE,
36         MODE_ZPIPE,
37         MODE_DATA
38     };
39 
40     enum NatRoles {
41         NAT_NONE,
42         NAT_CLIENT,
43         NAT_SERVER
44     };
45 
46     /**
47      * BufferedSocket factory, each BufferedSocket may only be used to create one connection
48      * @param sep Line separator
49      * @return An unconnected socket
50      */
getSocket(char sep)51     static BufferedSocket* getSocket(char sep) {
52         return new BufferedSocket(sep);
53     }
54 
putSocket(BufferedSocket * aSock)55     static void putSocket(BufferedSocket* aSock) {
56         if(aSock) {
57             aSock->removeListeners();
58             aSock->shutdown();
59         }
60     }
61 
waitShutdown()62     static void waitShutdown() {
63         while(sockets > 0)
64             Thread::sleep(100);
65     }
66 
67     void accept(const Socket& srv, bool secure, bool allowUntrusted);
68     void connect(const string& aAddress, uint16_t aPort, bool secure, bool allowUntrusted, bool proxy);
69     void connect(const string& aAddress, uint16_t aPort, uint16_t localPort, NatRoles natRole, bool secure, bool allowUntrusted, bool proxy);
70 
71     /** Sets data mode for aBytes bytes. Must be called within onLine. */
72     void setDataMode(int64_t aBytes = -1) { mode = MODE_DATA; dataBytes = aBytes; }
73     /**
74      * Rollback is an ugly hack to solve problems with compressed transfers where not all data received
75      * should be treated as data.
76      * Must be called from within onData.
77      */
setLineMode(size_t aRollback)78     void setLineMode(size_t aRollback) { setMode (MODE_LINE, aRollback);}
79     void setMode(Modes mode, size_t aRollback = 0);
getMode()80     Modes getMode() const { return mode; }
getIp()81     const string& getIp() const { return sock->getIp(); }
isConnected()82     bool isConnected() const { return sock->isConnected(); }
83 
isSecure()84     bool isSecure() const { return sock->isSecure(); }
isTrusted()85     bool isTrusted() const { return sock->isTrusted(); }
getCipherName()86     std::string getCipherName() const { return sock->getCipherName(); }
getKeyprint()87     vector<uint8_t> getKeyprint() const { return sock->getKeyprint(); }
88 
write(const string & aData)89     void write(const string& aData) { write(aData.data(), aData.length()); }
90     void write(const char* aBuf, size_t aLen) noexcept;
91     /** Send the file f over this socket. */
transmitFile(InputStream * f)92     void transmitFile(InputStream* f) { Lock l(cs); addTask(SEND_FILE, new SendFileInfo(f)); }
93 
94     /** Send an updated signal to all listeners */
updated()95     void updated() { Lock l(cs); addTask(UPDATED, 0); }
96 
97     void disconnect(bool graceless = false) noexcept { Lock l(cs); if(graceless) disconnecting = true; addTask(DISCONNECT, 0); }
98 
getLocalIp()99     string getLocalIp() const { return sock->getLocalIp(); }
getLocalPort()100     uint16_t getLocalPort() const { return sock->getLocalPort(); }
101 
102     GETSET(char, separator, Separator)
103 private:
104     enum Tasks {
105         CONNECT,
106         DISCONNECT,
107         SEND_DATA,
108         SEND_FILE,
109         SHUTDOWN,
110         ACCEPTED,
111         UPDATED
112     };
113 
114     enum State {
115         STARTING, // Waiting for CONNECT/ACCEPTED/SHUTDOWN
116         RUNNING,
117         FAILED
118     };
119 
120     struct TaskData {
~TaskDataTaskData121         virtual ~TaskData() { }
122     };
123     struct ConnectInfo : public TaskData {
ConnectInfoConnectInfo124         ConnectInfo(string addr_, uint16_t port_, uint16_t localPort_, NatRoles natRole_, bool proxy_) : addr(addr_), port(port_), localPort(localPort_), natRole(natRole_), proxy(proxy_) { }
125         string addr;
126         uint16_t port;
127         uint16_t localPort;
128         NatRoles natRole;
129         bool proxy;
130     };
131     struct SendFileInfo : public TaskData {
SendFileInfoSendFileInfo132         SendFileInfo(InputStream* stream_) : stream(stream_) { }
133         InputStream* stream;
134     };
135 
136     BufferedSocket(char aSeparator);
137 
138     virtual ~BufferedSocket();
139 
140     CriticalSection cs;
141 
142     Semaphore taskSem;
143     deque<pair<Tasks, unique_ptr<TaskData> > > tasks;
144 
145     Modes mode;
146     std::unique_ptr<UnZFilter> filterIn;
147     int64_t dataBytes;
148     size_t rollback;
149     string line;
150     ByteVector inbuf;
151     ByteVector writeBuf;
152     ByteVector sendBuf;
153 
154     std::unique_ptr<Socket> sock;
155     State state;
156     bool disconnecting;
157 
158     virtual int run();
159 
160     void threadConnect(const string& aAddr, uint16_t aPort, uint16_t localPort, NatRoles natRole, bool proxy);
161     void threadAccept();
162     void threadRead();
163     void threadSendFile(InputStream* is);
164     void threadSendData();
165 
166     void fail(const string& aError);
167     static Atomic<long,memory_ordering_strong> sockets;
168 
169     bool checkEvents();
170     void checkSocket();
171 
172     void setSocket(std::unique_ptr<Socket> s);
173     void shutdown();
174     void addTask(Tasks task, TaskData* data);
175 };
176 
177 } // namespace dcpp
178