1 /*
2 SPDX-FileCopyrightText: 2005 Joris Guisson <joris.guisson@gmail.com>
3
4 SPDX-License-Identifier: GPL-2.0-or-later
5 */
6 #include "packetsocket.h"
7 #include "speed.h"
8 #include <net/socketmonitor.h>
9 #include <util/log.h>
10
11 using namespace bt;
12
13 namespace net
14 {
PacketSocket(SocketDevice * sock)15 PacketSocket::PacketSocket(SocketDevice *sock)
16 : TrafficShapedSocket(sock)
17 , ctrl_packets_sent(0)
18 , uploaded_data_bytes(0)
19 {
20 }
21
PacketSocket(int fd,int ip_version)22 PacketSocket::PacketSocket(int fd, int ip_version)
23 : TrafficShapedSocket(fd, ip_version)
24 , ctrl_packets_sent(0)
25 , uploaded_data_bytes(0)
26 {
27 }
28
PacketSocket(bool tcp,int ip_version)29 PacketSocket::PacketSocket(bool tcp, int ip_version)
30 : TrafficShapedSocket(tcp, ip_version)
31 , ctrl_packets_sent(0)
32 , uploaded_data_bytes(0)
33 {
34 }
35
~PacketSocket()36 PacketSocket::~PacketSocket()
37 {
38 }
39
selectPacket()40 Packet::Ptr PacketSocket::selectPacket()
41 {
42 QMutexLocker locker(&mutex);
43 Packet::Ptr ret(nullptr);
44 // this function should ensure that between
45 // each data packet at least 3 control packets are sent
46 // so requests can get through
47
48 if (ctrl_packets_sent < 3) {
49 // try to send another control packet
50 if (control_packets.size() > 0)
51 ret = control_packets.front();
52 else if (data_packets.size() > 0)
53 ret = data_packets.front();
54 } else {
55 if (data_packets.size() > 0) {
56 ctrl_packets_sent = 0;
57 ret = data_packets.front();
58 } else if (control_packets.size() > 0)
59 ret = control_packets.front();
60 }
61
62 if (ret)
63 preProcess(ret);
64
65 return ret;
66 }
67
write(Uint32 max,bt::TimeStamp now)68 Uint32 PacketSocket::write(Uint32 max, bt::TimeStamp now)
69 {
70 if (sock->state() == net::SocketDevice::CONNECTING && !sock->connectSuccesFull())
71 return 0;
72
73 if (!curr_packet)
74 curr_packet = selectPacket();
75
76 Uint32 written = 0;
77 while (curr_packet && (written < max || max == 0)) {
78 Uint32 limit = (max == 0) ? 0 : max - written;
79 int ret = curr_packet->send(sock, limit);
80 if (ret > 0) {
81 written += ret;
82 QMutexLocker locker(&mutex);
83 if (curr_packet->getType() == PIECE) {
84 up_speed->onData(ret, now);
85 uploaded_data_bytes += ret;
86 }
87 } else
88 break; // Socket buffer full, so stop sending for now
89
90 if (curr_packet->isSent()) {
91 // packet sent, so remove it
92 if (curr_packet->getType() == PIECE) {
93 QMutexLocker locker(&mutex);
94 if (!data_packets.empty())
95 data_packets.pop_front();
96 // reset ctrl_packets_sent so the next packet should be a ctrl packet
97 ctrl_packets_sent = 0;
98 } else {
99 QMutexLocker locker(&mutex);
100 if (!control_packets.empty())
101 control_packets.pop_front();
102 ctrl_packets_sent++;
103 }
104 curr_packet = selectPacket();
105 } else {
106 // we can't write it fully, so break out of loop
107 break;
108 }
109 }
110
111 return written;
112 }
113
addPacket(Packet::Ptr packet)114 void PacketSocket::addPacket(Packet::Ptr packet)
115 {
116 QMutexLocker locker(&mutex);
117 if (packet->getType() == PIECE)
118 data_packets.push_back(packet);
119 else
120 control_packets.push_back(packet);
121 // tell upload thread we have data ready should it be sleeping
122 net::SocketMonitor::instance().signalPacketReady();
123 }
124
bytesReadyToWrite() const125 bool PacketSocket::bytesReadyToWrite() const
126 {
127 QMutexLocker locker(&mutex);
128 return !data_packets.empty() || !control_packets.empty();
129 }
130
preProcess(Packet::Ptr packet)131 void PacketSocket::preProcess(Packet::Ptr packet)
132 {
133 Q_UNUSED(packet);
134 }
135
dataBytesUploaded()136 Uint32 PacketSocket::dataBytesUploaded()
137 {
138 QMutexLocker locker(&mutex);
139 Uint32 ret = uploaded_data_bytes;
140 uploaded_data_bytes = 0;
141 return ret;
142 }
143
clearPieces(bool reject)144 void PacketSocket::clearPieces(bool reject)
145 {
146 QMutexLocker locker(&mutex);
147
148 auto i = data_packets.begin();
149 while (i != data_packets.end()) {
150 Packet::Ptr p = *i;
151 if (p->getType() == bt::PIECE && !p->sending() && curr_packet != p) {
152 if (reject)
153 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
154
155 i = data_packets.erase(i);
156 } else {
157 i++;
158 }
159 }
160 }
161
doNotSendPiece(const bt::Request & req,bool reject)162 void PacketSocket::doNotSendPiece(const bt::Request &req, bool reject)
163 {
164 QMutexLocker locker(&mutex);
165 auto i = data_packets.begin();
166 while (i != data_packets.end()) {
167 Packet::Ptr p = *i;
168 if (p->isPiece(req) && !p->sending() && p != curr_packet) {
169 i = data_packets.erase(i);
170 if (reject) {
171 // queue a reject packet
172 addPacket(Packet::Ptr(p->makeRejectOfPiece()));
173 }
174 } else {
175 i++;
176 }
177 }
178 }
179
numPendingPieceUploads() const180 Uint32 PacketSocket::numPendingPieceUploads() const
181 {
182 QMutexLocker locker(&mutex);
183 return data_packets.size();
184 }
185
186 }
187