1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /*!
21  * \file ring_buffer.h
22  * \brief this file aims to provide a wrapper of sockets
23  */
24 #ifndef TVM_SUPPORT_RING_BUFFER_H_
25 #define TVM_SUPPORT_RING_BUFFER_H_
26 
27 #include <algorithm>
28 #include <cstring>
29 #include <vector>
30 
31 namespace tvm {
32 namespace support {
33 /*!
34  * \brief Ring buffer class for data buffering in IO.
35  *  Enables easy usage for sync and async mode.
36  */
37 class RingBuffer {
38  public:
39   /*! \brief Initial capacity of ring buffer. */
40   static const int kInitCapacity = 4 << 10;
41   /*! \brief constructor */
RingBuffer()42   RingBuffer() : ring_(kInitCapacity) {}
43   /*! \return number of bytes available in buffer. */
bytes_available()44   size_t bytes_available() const { return bytes_available_; }
45   /*! \return Current capacity of buffer. */
capacity()46   size_t capacity() const { return ring_.size(); }
47   /*!
48    *  Reserve capacity to be at least n.
49    *  Will only increase capacity if n is bigger than current capacity.
50    *
51    *  The effect of Reserve only lasts before the next call to Reserve.
52    *  Other functions in the ring buffer can also call into the reserve.
53    *
54    * \param n The size of capacity.
55    */
Reserve(size_t n)56   void Reserve(size_t n) {
57     if (ring_.size() < n) {
58       size_t old_size = ring_.size();
59       size_t new_size = static_cast<size_t>(n * 1.2);
60       ring_.resize(new_size);
61       if (head_ptr_ + bytes_available_ > old_size) {
62         // copy the ring overflow part into the tail.
63         size_t ncopy = head_ptr_ + bytes_available_ - old_size;
64         memcpy(&ring_[0] + old_size, &ring_[0], ncopy);
65       }
66     } else if (ring_.size() > n * 8 && ring_.size() > kInitCapacity) {
67       // shrink too large temporary buffer to
68       // avoid out of memory on some embedded devices
69       if (bytes_available_ != 0) {
70         // move existing bytes to the head.
71         size_t old_bytes = bytes_available_;
72         std::vector<char> tmp(old_bytes);
73         Read(&tmp[0], old_bytes);
74 
75         memcpy(&ring_[0], &tmp[0], old_bytes);
76         bytes_available_ = old_bytes;
77       }
78       // shrink the ring.
79       size_t new_size = kInitCapacity;
80       new_size = std::max(new_size, n);
81       new_size = std::max(new_size, bytes_available_);
82 
83       ring_.resize(new_size);
84       ring_.shrink_to_fit();
85       head_ptr_ = 0;
86     }
87   }
88 
89   /*!
90    * \brief Peform a non-blocking read from buffer
91    *  size must be smaller than this->bytes_available()
92    * \param data the data pointer.
93    * \param size The number of bytes to read.
94    */
Read(void * data,size_t size)95   void Read(void* data, size_t size) {
96     CHECK_GE(bytes_available_, size);
97     size_t ncopy = std::min(size, ring_.size() - head_ptr_);
98     memcpy(data, &ring_[0] + head_ptr_, ncopy);
99     if (ncopy < size) {
100       memcpy(reinterpret_cast<char*>(data) + ncopy, &ring_[0], size - ncopy);
101     }
102     head_ptr_ = (head_ptr_ + size) % ring_.size();
103     bytes_available_ -= size;
104   }
105   /*!
106    * \brief Read data from buffer with and put them to non-blocking send function.
107    *
108    * \param fsend A send function handle to put the data to.
109    * \param max_nbytes Maximum number of bytes can to read.
110    * \tparam FSend A non-blocking function with signature size_t (const void* data, size_t size);
111    */
112   template <typename FSend>
ReadWithCallback(FSend fsend,size_t max_nbytes)113   size_t ReadWithCallback(FSend fsend, size_t max_nbytes) {
114     size_t size = std::min(max_nbytes, bytes_available_);
115     CHECK_NE(size, 0U);
116     size_t ncopy = std::min(size, ring_.size() - head_ptr_);
117     size_t nsend = fsend(&ring_[0] + head_ptr_, ncopy);
118     bytes_available_ -= nsend;
119     if (ncopy == nsend && ncopy < size) {
120       size_t nsend2 = fsend(&ring_[0], size - ncopy);
121       bytes_available_ -= nsend2;
122       nsend += nsend2;
123     }
124     return nsend;
125   }
126   /*!
127    * \brief Write data into buffer, always ensures all data is written.
128    * \param data The data pointer
129    * \param size The size of data to be written.
130    */
Write(const void * data,size_t size)131   void Write(const void* data, size_t size) {
132     this->Reserve(bytes_available_ + size);
133     size_t tail = head_ptr_ + bytes_available_;
134     if (tail >= ring_.size()) {
135       memcpy(&ring_[0] + (tail - ring_.size()), data, size);
136     } else {
137       size_t ncopy = std::min(ring_.size() - tail, size);
138       memcpy(&ring_[0] + tail, data, ncopy);
139       if (ncopy < size) {
140         memcpy(&ring_[0], reinterpret_cast<const char*>(data) + ncopy, size - ncopy);
141       }
142     }
143     bytes_available_ += size;
144   }
145   /*!
146    * \brief Written data into the buffer by give it a non-blocking callback function.
147    *
148    * \param frecv A receive function handle
149    * \param max_nbytes Maximum number of bytes can write.
150    * \tparam FRecv A non-blocking function with signature size_t (void* data, size_t size);
151    */
152   template <typename FRecv>
WriteWithCallback(FRecv frecv,size_t max_nbytes)153   size_t WriteWithCallback(FRecv frecv, size_t max_nbytes) {
154     this->Reserve(bytes_available_ + max_nbytes);
155     size_t nbytes = max_nbytes;
156     size_t tail = head_ptr_ + bytes_available_;
157     if (tail >= ring_.size()) {
158       size_t nrecv = frecv(&ring_[0] + (tail - ring_.size()), nbytes);
159       bytes_available_ += nrecv;
160       return nrecv;
161     } else {
162       size_t ncopy = std::min(ring_.size() - tail, nbytes);
163       size_t nrecv = frecv(&ring_[0] + tail, ncopy);
164       bytes_available_ += nrecv;
165       if (nrecv == ncopy && ncopy < nbytes) {
166         size_t nrecv2 = frecv(&ring_[0], nbytes - ncopy);
167         bytes_available_ += nrecv2;
168         nrecv += nrecv2;
169       }
170       return nrecv;
171     }
172   }
173 
174  private:
175   // buffer head
176   size_t head_ptr_{0};
177   // number of bytes occupied in the buffer.
178   size_t bytes_available_{0};
179   // The internal data ring.
180   std::vector<char> ring_;
181 };
182 }  // namespace support
183 }  // namespace tvm
184 #endif  // TVM_SUPPORT_RING_BUFFER_H_
185