1 /*
2     SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
3 
4     SPDX-License-Identifier: GPL-2.0-or-later
5 */
6 #include "packetsocket.h"
7 #include "speed.h"
8 #include <net/socketmonitor.h>
9 #include <util/log.h>
10 
11 using namespace bt;
12 
13 namespace net
14 {
PacketSocket(SocketDevice * sock)15 PacketSocket::PacketSocket(SocketDevice *sock)
16     : TrafficShapedSocket(sock)
17     , ctrl_packets_sent(0)
18     , uploaded_data_bytes(0)
19 {
20 }
21 
PacketSocket(int fd,int ip_version)22 PacketSocket::PacketSocket(int fd, int ip_version)
23     : TrafficShapedSocket(fd, ip_version)
24     , ctrl_packets_sent(0)
25     , uploaded_data_bytes(0)
26 {
27 }
28 
PacketSocket(bool tcp,int ip_version)29 PacketSocket::PacketSocket(bool tcp, int ip_version)
30     : TrafficShapedSocket(tcp, ip_version)
31     , ctrl_packets_sent(0)
32     , uploaded_data_bytes(0)
33 {
34 }
35 
~PacketSocket()36 PacketSocket::~PacketSocket()
37 {
38 }
39 
selectPacket()40 Packet::Ptr PacketSocket::selectPacket()
41 {
42     QMutexLocker locker(&mutex);
43     Packet::Ptr ret(nullptr);
44     // this function should ensure that between
45     // each data packet at least 3 control packets are sent
46     // so requests can get through
47 
48     if (ctrl_packets_sent < 3) {
49         // try to send another control packet
50         if (control_packets.size() > 0)
51             ret = control_packets.front();
52         else if (data_packets.size() > 0)
53             ret = data_packets.front();
54     } else {
55         if (data_packets.size() > 0) {
56             ctrl_packets_sent = 0;
57             ret = data_packets.front();
58         } else if (control_packets.size() > 0)
59             ret = control_packets.front();
60     }
61 
62     if (ret)
63         preProcess(ret);
64 
65     return ret;
66 }
67 
write(Uint32 max,bt::TimeStamp now)68 Uint32 PacketSocket::write(Uint32 max, bt::TimeStamp now)
69 {
70     if (sock->state() == net::SocketDevice::CONNECTING && !sock->connectSuccesFull())
71         return 0;
72 
73     if (!curr_packet)
74         curr_packet = selectPacket();
75 
76     Uint32 written = 0;
77     while (curr_packet && (written < max || max == 0)) {
78         Uint32 limit = (max == 0) ? 0 : max - written;
79         int ret = curr_packet->send(sock, limit);
80         if (ret > 0) {
81             written += ret;
82             QMutexLocker locker(&mutex);
83             if (curr_packet->getType() == PIECE) {
84                 up_speed->onData(ret, now);
85                 uploaded_data_bytes += ret;
86             }
87         } else
88             break; // Socket buffer full, so stop sending for now
89 
90         if (curr_packet->isSent()) {
91             // packet sent, so remove it
92             if (curr_packet->getType() == PIECE) {
93                 QMutexLocker locker(&mutex);
94                 if (!data_packets.empty())
95                     data_packets.pop_front();
96                 // reset ctrl_packets_sent so the next packet should be a ctrl packet
97                 ctrl_packets_sent = 0;
98             } else {
99                 QMutexLocker locker(&mutex);
100                 if (!control_packets.empty())
101                     control_packets.pop_front();
102                 ctrl_packets_sent++;
103             }
104             curr_packet = selectPacket();
105         } else {
106             // we can't write it fully, so break out of loop
107             break;
108         }
109     }
110 
111     return written;
112 }
113 
addPacket(Packet::Ptr packet)114 void PacketSocket::addPacket(Packet::Ptr packet)
115 {
116     QMutexLocker locker(&mutex);
117     if (packet->getType() == PIECE)
118         data_packets.push_back(packet);
119     else
120         control_packets.push_back(packet);
121     // tell upload thread we have data ready should it be sleeping
122     net::SocketMonitor::instance().signalPacketReady();
123 }
124 
bytesReadyToWrite() const125 bool PacketSocket::bytesReadyToWrite() const
126 {
127     QMutexLocker locker(&mutex);
128     return !data_packets.empty() || !control_packets.empty();
129 }
130 
preProcess(Packet::Ptr packet)131 void PacketSocket::preProcess(Packet::Ptr packet)
132 {
133     Q_UNUSED(packet);
134 }
135 
dataBytesUploaded()136 Uint32 PacketSocket::dataBytesUploaded()
137 {
138     QMutexLocker locker(&mutex);
139     Uint32 ret = uploaded_data_bytes;
140     uploaded_data_bytes = 0;
141     return ret;
142 }
143 
clearPieces(bool reject)144 void PacketSocket::clearPieces(bool reject)
145 {
146     QMutexLocker locker(&mutex);
147 
148     auto i = data_packets.begin();
149     while (i != data_packets.end()) {
150         Packet::Ptr p = *i;
151         if (p->getType() == bt::PIECE && !p->sending() && curr_packet != p) {
152             if (reject)
153                 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
154 
155             i = data_packets.erase(i);
156         } else {
157             i++;
158         }
159     }
160 }
161 
doNotSendPiece(const bt::Request & req,bool reject)162 void PacketSocket::doNotSendPiece(const bt::Request &req, bool reject)
163 {
164     QMutexLocker locker(&mutex);
165     auto i = data_packets.begin();
166     while (i != data_packets.end()) {
167         Packet::Ptr p = *i;
168         if (p->isPiece(req) && !p->sending() && p != curr_packet) {
169             i = data_packets.erase(i);
170             if (reject) {
171                 // queue a reject packet
172                 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
173             }
174         } else {
175             i++;
176         }
177     }
178 }
179 
numPendingPieceUploads() const180 Uint32 PacketSocket::numPendingPieceUploads() const
181 {
182     QMutexLocker locker(&mutex);
183     return data_packets.size();
184 }
185 
186 }
187