1 // Copyright (c) 2015-2018 Josh Blum
2 // Copyright (c) 2016-2016 Bastille Networks
3 // SPDX-License-Identifier: BSL-1.0
4
5 #include "ServerStreamData.hpp"
6 #include "SoapyRemoteDefs.hpp"
7 #include "SoapyStreamEndpoint.hpp"
8 #include <SoapySDR/Device.hpp>
9 #include <SoapySDR/Logger.hpp>
10 #include <algorithm> //min
11 #include <thread>
12 #include <vector>
13 #include <cassert>
14
15 template <typename T>
incrementBuffs(std::vector<T> & buffs,size_t numElems,size_t elemSize)16 void incrementBuffs(std::vector<T> &buffs, size_t numElems, size_t elemSize)
17 {
18 for (auto &buff : buffs)
19 {
20 buff = T(size_t(buff) + (numElems*elemSize));
21 }
22 }
23
ServerStreamData(void)24 ServerStreamData::ServerStreamData(void):
25 device(nullptr),
26 stream(nullptr),
27 chanMask(0),
28 priority(0.0),
29 streamId(-1),
30 streamSock(nullptr),
31 statusSock(nullptr),
32 endpoint(nullptr),
33 streamThread(nullptr),
34 statusThread(nullptr),
35 done(true)
36 {
37 return;
38 }
39
~ServerStreamData(void)40 ServerStreamData::~ServerStreamData(void)
41 {
42 delete streamSock;
43 delete statusSock;
44 }
45
startSendThread(void)46 void ServerStreamData::startSendThread(void)
47 {
48 assert(streamId != -1);
49 done = false;
50 streamThread = new std::thread(&ServerStreamData::sendEndpointWork, this);
51 }
52
startRecvThread(void)53 void ServerStreamData::startRecvThread(void)
54 {
55 assert(streamId != -1);
56 done = false;
57 streamThread = new std::thread(&ServerStreamData::recvEndpointWork, this);
58 }
59
startStatThread(void)60 void ServerStreamData::startStatThread(void)
61 {
62 assert(streamId != -1);
63 done = false;
64 statusThread = new std::thread(&ServerStreamData::statEndpointWork, this);
65 }
66
stopThreads(void)67 void ServerStreamData::stopThreads(void)
68 {
69 done = true;
70 if (streamThread != nullptr)
71 {
72 streamThread->join();
73 delete streamThread;
74 }
75 if (statusThread != nullptr)
76 {
77 statusThread->join();
78 delete statusThread;
79 }
80 }
81
setThreadPrioWithLogging(const double priority)82 static void setThreadPrioWithLogging(const double priority)
83 {
84 const auto errorMsg = setThreadPrio(priority);
85 if (not errorMsg.empty()) SoapySDR::logf(SOAPY_SDR_WARNING,
86 "Set thread priority %g failed: %s", priority, errorMsg.c_str());
87 }
88
recvEndpointWork(void)89 void ServerStreamData::recvEndpointWork(void)
90 {
91 setThreadPrioWithLogging(priority);
92 assert(endpoint != nullptr);
93 assert(endpoint->getElemSize() != 0);
94 assert(endpoint->getNumChans() != 0);
95
96 //setup worker data structures
97 int ret = 0;
98 size_t handle = 0;
99 int flags = 0;
100 long long timeNs = 0;
101 const auto elemSize = endpoint->getElemSize();
102 std::vector<const void *> buffs(endpoint->getNumChans());
103
104 //loop forever until signaled done
105 //1) wait on the endpoint to become ready
106 //2) acquire the recv buffer from the endpoint
107 //3) write to the device stream from the endpoint buffer
108 //4) release the buffer back to the endpoint
109 while (not done)
110 {
111 if (not endpoint->waitRecv(SOAPY_REMOTE_SOCKET_TIMEOUT_US)) continue;
112 ret = endpoint->acquireRecv(handle, buffs.data(), flags, timeNs);
113 if (ret < 0)
114 {
115 SoapySDR::logf(SOAPY_SDR_ERROR, "Server-side receive endpoint: %s; worker quitting...", streamSock->lastErrorMsg());
116 return;
117 }
118
119 //loop to write to device
120 size_t elemsLeft = size_t(ret);
121 while (not done)
122 {
123 ret = device->writeStream(stream, buffs.data(), elemsLeft, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
124 if (ret == SOAPY_SDR_TIMEOUT) continue;
125 if (ret < 0)
126 {
127 endpoint->writeStatus(ret, chanMask, flags, timeNs);
128 break; //discard after error, this may have been invalid flags or time
129 }
130 if (elemsLeft < (size_t)ret)
131 {
132 SoapySDR_logf(SOAPY_SDR_ERROR, "Server-side receive endpoint: device->writeStream wrote more elements than requested");
133 break; //stop after error
134 }
135 elemsLeft -= ret;
136 incrementBuffs(buffs, ret, elemSize);
137 if (elemsLeft == 0) break;
138 flags &= ~(SOAPY_SDR_HAS_TIME); //clear time for subsequent writes
139 }
140
141 //release the buffer back to the endpoint
142 endpoint->releaseRecv(handle);
143 }
144 }
145
sendEndpointWork(void)146 void ServerStreamData::sendEndpointWork(void)
147 {
148 setThreadPrioWithLogging(priority);
149 assert(endpoint != nullptr);
150 assert(endpoint->getElemSize() != 0);
151 assert(endpoint->getNumChans() != 0);
152
153 //setup worker data structures
154 int ret = 0;
155 size_t handle = 0;
156 int flags = 0;
157 long long timeNs = 0;
158 const auto elemSize = endpoint->getElemSize();
159 std::vector<void *> buffs(endpoint->getNumChans());
160 const size_t mtuElems = device->getStreamMTU(stream);
161
162 //loop forever until signaled done
163 //1) waits on the endpoint to become ready
164 //2) acquire the send buffer from the endpoint
165 //3) read from the device stream into the endpoint buffer
166 //4) release the buffer back to the endpoint (sends)
167 while (not done)
168 {
169 if (not endpoint->waitSend(SOAPY_REMOTE_SOCKET_TIMEOUT_US)) continue;
170 ret = endpoint->acquireSend(handle, buffs.data());
171 if (ret < 0)
172 {
173 SoapySDR::logf(SOAPY_SDR_ERROR, "Server-side send endpoint: %s; worker quitting...", streamSock->lastErrorMsg());
174 return;
175 }
176
177 //Read only up to MTU size with a timeout for minimal waiting.
178 //In the next section we will continue the read with non-blocking.
179 size_t elemsLeft = size_t(ret);
180 size_t elemsRead = 0;
181 while (not done)
182 {
183 flags = 0; //flags is an in/out parameter and must be cleared for consistency
184 const size_t numElems = std::min(mtuElems, elemsLeft);
185 ret = device->readStream(stream, buffs.data(), numElems, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
186 if (ret == SOAPY_SDR_TIMEOUT) continue;
187 if (ret < 0)
188 {
189 //ret will be propagated to remote endpoint
190 break;
191 }
192 elemsLeft -= ret;
193 elemsRead += ret;
194 incrementBuffs(buffs, ret, elemSize);
195 break;
196 }
197
198 //fill remaining buffer with no timeout
199 //This is a latency optimization to forward to the host ASAP,
200 //but to use the full bandwidth when more data is available.
201 //Do not allow this optimization when end of burst or single packet mode to preserve boundaries
202 static const int trailingFlags(SOAPY_SDR_END_BURST | SOAPY_SDR_ONE_PACKET | SOAPY_SDR_END_ABRUPT);
203 if (elemsRead != 0 and elemsLeft != 0 and (flags & trailingFlags) == 0)
204 {
205 int flags1 = 0;
206 long long timeNs1 = 0;
207 ret = device->readStream(stream, buffs.data(), elemsLeft, flags1, timeNs1, 0);
208 if (ret == SOAPY_SDR_TIMEOUT) ret = 0; //timeouts OK
209 if (ret > 0)
210 {
211 elemsLeft -= ret;
212 elemsRead += ret;
213 }
214
215 //include trailing flags that come from the second read
216 flags |= (flags1 & trailingFlags);
217 }
218
219 //release the buffer with flags and time from the first read
220 //if any read call returned an error, forward the error instead
221 endpoint->releaseSend(handle, (ret < 0)?ret:elemsRead, flags, timeNs);
222 }
223 }
224
statEndpointWork(void)225 void ServerStreamData::statEndpointWork(void)
226 {
227 assert(endpoint != nullptr);
228
229 int ret = 0;
230 size_t chanMask = 0;
231 int flags = 0;
232 long long timeNs = 0;
233
234 while (not done)
235 {
236 ret = device->readStreamStatus(stream, chanMask, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
237 if (ret == SOAPY_SDR_TIMEOUT) continue;
238 endpoint->writeStatus(ret, chanMask, flags, timeNs);
239
240 //exit the thread if stream status is not supported
241 //but only after reporting this to the local endpoint
242 if (ret == SOAPY_SDR_NOT_SUPPORTED) return;
243 }
244 }
245