1 // Copyright (c) 2015-2018 Josh Blum
2 // Copyright (c) 2016-2016 Bastille Networks
3 // SPDX-License-Identifier: BSL-1.0
4 
5 #include "ServerStreamData.hpp"
6 #include "SoapyRemoteDefs.hpp"
7 #include "SoapyStreamEndpoint.hpp"
8 #include <SoapySDR/Device.hpp>
9 #include <SoapySDR/Logger.hpp>
10 #include <algorithm> //min
11 #include <thread>
12 #include <vector>
13 #include <cassert>
14 
15 template <typename T>
incrementBuffs(std::vector<T> & buffs,size_t numElems,size_t elemSize)16 void incrementBuffs(std::vector<T> &buffs, size_t numElems, size_t elemSize)
17 {
18     for (auto &buff : buffs)
19     {
20         buff = T(size_t(buff) + (numElems*elemSize));
21     }
22 }
23 
ServerStreamData(void)24 ServerStreamData::ServerStreamData(void):
25     device(nullptr),
26     stream(nullptr),
27     chanMask(0),
28     priority(0.0),
29     streamId(-1),
30     streamSock(nullptr),
31     statusSock(nullptr),
32     endpoint(nullptr),
33     streamThread(nullptr),
34     statusThread(nullptr),
35     done(true)
36 {
37     return;
38 }
39 
~ServerStreamData(void)40 ServerStreamData::~ServerStreamData(void)
41 {
42     delete streamSock;
43     delete statusSock;
44 }
45 
startSendThread(void)46 void ServerStreamData::startSendThread(void)
47 {
48     assert(streamId != -1);
49     done = false;
50     streamThread = new std::thread(&ServerStreamData::sendEndpointWork, this);
51 }
52 
startRecvThread(void)53 void ServerStreamData::startRecvThread(void)
54 {
55     assert(streamId != -1);
56     done = false;
57     streamThread = new std::thread(&ServerStreamData::recvEndpointWork, this);
58 }
59 
startStatThread(void)60 void ServerStreamData::startStatThread(void)
61 {
62     assert(streamId != -1);
63     done = false;
64     statusThread = new std::thread(&ServerStreamData::statEndpointWork, this);
65 }
66 
stopThreads(void)67 void ServerStreamData::stopThreads(void)
68 {
69     done = true;
70     if (streamThread != nullptr)
71     {
72         streamThread->join();
73         delete streamThread;
74     }
75     if (statusThread != nullptr)
76     {
77         statusThread->join();
78         delete statusThread;
79     }
80 }
81 
setThreadPrioWithLogging(const double priority)82 static void setThreadPrioWithLogging(const double priority)
83 {
84     const auto errorMsg = setThreadPrio(priority);
85     if (not errorMsg.empty()) SoapySDR::logf(SOAPY_SDR_WARNING,
86         "Set thread priority %g failed: %s", priority, errorMsg.c_str());
87 }
88 
recvEndpointWork(void)89 void ServerStreamData::recvEndpointWork(void)
90 {
91     setThreadPrioWithLogging(priority);
92     assert(endpoint != nullptr);
93     assert(endpoint->getElemSize() != 0);
94     assert(endpoint->getNumChans() != 0);
95 
96     //setup worker data structures
97     int ret = 0;
98     size_t handle = 0;
99     int flags = 0;
100     long long timeNs = 0;
101     const auto elemSize = endpoint->getElemSize();
102     std::vector<const void *> buffs(endpoint->getNumChans());
103 
104     //loop forever until signaled done
105     //1) wait on the endpoint to become ready
106     //2) acquire the recv buffer from the endpoint
107     //3) write to the device stream from the endpoint buffer
108     //4) release the buffer back to the endpoint
109     while (not done)
110     {
111         if (not endpoint->waitRecv(SOAPY_REMOTE_SOCKET_TIMEOUT_US)) continue;
112         ret = endpoint->acquireRecv(handle, buffs.data(), flags, timeNs);
113         if (ret < 0)
114         {
115             SoapySDR::logf(SOAPY_SDR_ERROR, "Server-side receive endpoint: %s; worker quitting...", streamSock->lastErrorMsg());
116             return;
117         }
118 
119         //loop to write to device
120         size_t elemsLeft = size_t(ret);
121         while (not done)
122         {
123             ret = device->writeStream(stream, buffs.data(), elemsLeft, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
124             if (ret == SOAPY_SDR_TIMEOUT) continue;
125             if (ret < 0)
126             {
127                 endpoint->writeStatus(ret, chanMask, flags, timeNs);
128                 break; //discard after error, this may have been invalid flags or time
129             }
130             if (elemsLeft < (size_t)ret)
131             {
132                 SoapySDR_logf(SOAPY_SDR_ERROR, "Server-side receive endpoint: device->writeStream wrote more elements than requested");
133                 break; //stop after error
134             }
135             elemsLeft -= ret;
136             incrementBuffs(buffs, ret, elemSize);
137             if (elemsLeft == 0) break;
138             flags &= ~(SOAPY_SDR_HAS_TIME); //clear time for subsequent writes
139         }
140 
141         //release the buffer back to the endpoint
142         endpoint->releaseRecv(handle);
143     }
144 }
145 
sendEndpointWork(void)146 void ServerStreamData::sendEndpointWork(void)
147 {
148     setThreadPrioWithLogging(priority);
149     assert(endpoint != nullptr);
150     assert(endpoint->getElemSize() != 0);
151     assert(endpoint->getNumChans() != 0);
152 
153     //setup worker data structures
154     int ret = 0;
155     size_t handle = 0;
156     int flags = 0;
157     long long timeNs = 0;
158     const auto elemSize = endpoint->getElemSize();
159     std::vector<void *> buffs(endpoint->getNumChans());
160     const size_t mtuElems = device->getStreamMTU(stream);
161 
162     //loop forever until signaled done
163     //1) waits on the endpoint to become ready
164     //2) acquire the send buffer from the endpoint
165     //3) read from the device stream into the endpoint buffer
166     //4) release the buffer back to the endpoint (sends)
167     while (not done)
168     {
169         if (not endpoint->waitSend(SOAPY_REMOTE_SOCKET_TIMEOUT_US)) continue;
170         ret = endpoint->acquireSend(handle, buffs.data());
171         if (ret < 0)
172         {
173             SoapySDR::logf(SOAPY_SDR_ERROR, "Server-side send endpoint: %s; worker quitting...", streamSock->lastErrorMsg());
174             return;
175         }
176 
177         //Read only up to MTU size with a timeout for minimal waiting.
178         //In the next section we will continue the read with non-blocking.
179         size_t elemsLeft = size_t(ret);
180         size_t elemsRead = 0;
181         while (not done)
182         {
183             flags = 0; //flags is an in/out parameter and must be cleared for consistency
184             const size_t numElems = std::min(mtuElems, elemsLeft);
185             ret = device->readStream(stream, buffs.data(), numElems, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
186             if (ret == SOAPY_SDR_TIMEOUT) continue;
187             if (ret < 0)
188             {
189                 //ret will be propagated to remote endpoint
190                 break;
191             }
192             elemsLeft -= ret;
193             elemsRead += ret;
194             incrementBuffs(buffs, ret, elemSize);
195             break;
196         }
197 
198         //fill remaining buffer with no timeout
199         //This is a latency optimization to forward to the host ASAP,
200         //but to use the full bandwidth when more data is available.
201         //Do not allow this optimization when end of burst or single packet mode to preserve boundaries
202         static const int trailingFlags(SOAPY_SDR_END_BURST | SOAPY_SDR_ONE_PACKET | SOAPY_SDR_END_ABRUPT);
203         if (elemsRead != 0 and elemsLeft != 0 and (flags & trailingFlags) == 0)
204         {
205             int flags1 = 0;
206             long long timeNs1 = 0;
207             ret = device->readStream(stream, buffs.data(), elemsLeft, flags1, timeNs1, 0);
208             if (ret == SOAPY_SDR_TIMEOUT) ret = 0; //timeouts OK
209             if (ret > 0)
210             {
211                 elemsLeft -= ret;
212                 elemsRead += ret;
213             }
214 
215             //include trailing flags that come from the second read
216             flags |= (flags1 & trailingFlags);
217         }
218 
219         //release the buffer with flags and time from the first read
220         //if any read call returned an error, forward the error instead
221         endpoint->releaseSend(handle, (ret < 0)?ret:elemsRead, flags, timeNs);
222     }
223 }
224 
statEndpointWork(void)225 void ServerStreamData::statEndpointWork(void)
226 {
227     assert(endpoint != nullptr);
228 
229     int ret = 0;
230     size_t chanMask = 0;
231     int flags = 0;
232     long long timeNs = 0;
233 
234     while (not done)
235     {
236         ret = device->readStreamStatus(stream, chanMask, flags, timeNs, SOAPY_REMOTE_SOCKET_TIMEOUT_US);
237         if (ret == SOAPY_SDR_TIMEOUT) continue;
238         endpoint->writeStatus(ret, chanMask, flags, timeNs);
239 
240         //exit the thread if stream status is not supported
241         //but only after reporting this to the local endpoint
242         if (ret == SOAPY_SDR_NOT_SUPPORTED) return;
243     }
244 }
245