1 // Copyright (c) 2015-2016 Josh Blum
2 // SPDX-License-Identifier: BSL-1.0
3 
4 #pragma once
5 #include "SoapyRemoteConfig.hpp"
6 #include <cstddef>
7 #include <vector>
8 
9 class SoapyRPCSocket;
10 
11 /*!
12  * The stream endpoint supports a windowed link datagram protocol.
13  * This endpoint can be operated in only one mode: receive or send,
14  * and must be paired with another differently configured endpoint.
15  */
16 class SOAPY_REMOTE_API SoapyStreamEndpoint
17 {
18 public:
19     SoapyStreamEndpoint(
20         SoapyRPCSocket &streamSock,
21         SoapyRPCSocket &statusSock,
22         const bool datagramMode,
23         const bool isRecv,
24         const size_t numChans,
25         const size_t elemSize,
26         const size_t mtu,
27         const size_t window);
28 
29     ~SoapyStreamEndpoint(void);
30 
31     //! How many channels configured
getNumChans(void) const32     size_t getNumChans(void) const
33     {
34         return _numChans;
35     }
36 
37     //! Element size in bytes
getElemSize(void) const38     size_t getElemSize(void) const
39     {
40         return _elemSize;
41     }
42 
43     //! Actual buffer size in elements
getBuffSize(void) const44     size_t getBuffSize(void) const
45     {
46         return _buffSize;
47     }
48 
49     //! Actual number of buffers
getNumBuffs(void) const50     size_t getNumBuffs(void) const
51     {
52         return _numBuffs;
53     }
54 
55     //! Query handle addresses
getAddrs(const size_t handle,void ** buffs) const56     void getAddrs(const size_t handle, void **buffs) const
57     {
58         for (size_t i = 0; i < _numChans; i++)
59         {
60             buffs[i] = _buffData[handle].buffs[i];
61         }
62     }
63 
64     /*******************************************************************
65      * receive endpoint API
66      ******************************************************************/
67 
68     /*!
69      * Wait for a datagram to arrive at the socket
70      * return true when ready for false for timeout.
71      */
72     bool waitRecv(const long timeoutUs);
73 
74     /*!
75      * Acquire a receive buffer with metadata.
76      * return the number of elements or error code
77      */
78     int acquireRecv(size_t &handle, const void **buffs, int &flags, long long &timeNs);
79 
80     /*!
81      * Release the buffer when done.
82      */
83     void releaseRecv(const size_t handle);
84 
85     /*******************************************************************
86      * send endpoint API
87      ******************************************************************/
88 
89     /*!
90      * Wait for the flow control to allow transmission.
91      * return true when ready for false for timeout.
92      */
93     bool waitSend(const long timeoutUs);
94 
95     /*!
96      * Acquire a receive buffer with metadata.
97      */
98     int acquireSend(size_t &handle, void **buffs);
99 
100     /*!
101      * Release the buffer when done.
102      * pass in the number of elements or error code
103      */
104     void releaseSend(const size_t handle, const int numElemsOrErr, int &flags, const long long timeNs);
105 
106     /*******************************************************************
107      * status endpoint API -- used by both directions
108      ******************************************************************/
109 
110     /*!
111      * Wait for a status message to arrive
112      */
113     bool waitStatus(const long timeoutUs);
114 
115     /*!
116      * Read the stream status data.
117      * Return 0 or error code.
118      */
119     int readStatus(size_t &chanMask, int &flags, long long &timeNs);
120 
121     /*!
122      * Write the stream status from the forwarder.
123      */
124     void writeStatus(const int code, const size_t chanMask, const int flags, const long long timeNs);
125 
126 private:
127     SoapyRPCSocket &_streamSock;
128     SoapyRPCSocket &_statusSock;
129     const bool _datagramMode;
130     const size_t _xferSize;
131     const size_t _numChans;
132     const size_t _elemSize;
133     const size_t _buffSize;
134     const size_t _numBuffs;
135 
136     struct BufferData
137     {
138         std::vector<char> buff; //actual POD
139         std::vector<void *> buffs; //pointers
140         bool acquired;
141     };
142     std::vector<BufferData> _buffData;
143 
144     //acquire+release tracking
145     size_t _nextHandleAcquire;
146     size_t _nextHandleRelease;
147     size_t _numHandlesAcquired;
148 
149     //sequence tracking
150     size_t _lastSendSequence;
151     size_t _lastRecvSequence;
152     size_t _maxInFlightSeqs;
153     bool _receiveInitial;
154 
155     //how often to send a flow control ACK? (recv only)
156     size_t _triggerAckWindow;
157 
158     //flow control helpers
159     void sendACK(void);
160     void recvACK(void);
161 };
162