1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module thrift.transport.memory;
20
21 import core.exception : onOutOfMemoryError;
22 import core.stdc.stdlib : free, realloc;
23 import std.algorithm : min;
24 import std.conv : text;
25 import thrift.transport.base;
26
27 /**
28 * A transport that simply reads from and writes to an in-memory buffer. Every
29 * time you call write on it, the data is simply placed into a buffer, and
30 * every time you call read, data is consumed from that buffer.
31 *
32 * Currently, the storage for written data is never reclaimed, even if the
33 * buffer contents have already been read out again.
34 */
35 final class TMemoryBuffer : TBaseTransport {
36 /**
37 * Constructs a new memory transport with an empty internal buffer.
38 */
this()39 this() {}
40
41 /**
42 * Constructs a new memory transport with an empty internal buffer,
43 * reserving space for capacity bytes in advance.
44 *
45 * If the amount of data which will be written to the buffer is already
46 * known on construction, this can better performance over the default
47 * constructor because reallocations can be avoided.
48 *
49 * If the preallocated buffer is exhausted, data can still be written to the
50 * transport, but reallocations will happen.
51 *
52 * Params:
53 * capacity = Size of the initially reserved buffer (in bytes).
54 */
this(size_t capacity)55 this(size_t capacity) {
56 reset(capacity);
57 }
58
59 /**
60 * Constructs a new memory transport initially containing the passed data.
61 *
62 * For now, the passed buffer is not intelligently used, the data is just
63 * copied to the internal buffer.
64 *
65 * Params:
66 * buffer = Initial contents available to be read.
67 */
this(in ubyte[]contents)68 this(in ubyte[] contents) {
69 auto size = contents.length;
70 reset(size);
71 buffer_[0 .. size] = contents[];
72 writeOffset_ = size;
73 }
74
75 /**
76 * Destructor, frees the internally allocated buffer.
77 */
~this()78 ~this() {
79 free(buffer_);
80 }
81
82 /**
83 * Returns a read-only view of the current buffer contents.
84 *
85 * Note: For performance reasons, the returned slice is only valid for the
86 * life of this object, and may be invalidated on the next write() call at
87 * will – you might want to immediately .dup it if you intend to keep it
88 * around.
89 */
getContents()90 const(ubyte)[] getContents() {
91 return buffer_[readOffset_ .. writeOffset_];
92 }
93
94 /**
95 * A memory transport is always open.
96 */
isOpen()97 override bool isOpen() @property {
98 return true;
99 }
100
peek()101 override bool peek() {
102 return writeOffset_ - readOffset_ > 0;
103 }
104
105 /**
106 * Opening is a no-op() for a memory buffer.
107 */
open()108 override void open() {}
109
110 /**
111 * Closing is a no-op() for a memory buffer, it is always open.
112 */
close()113 override void close() {}
114
read(ubyte[]buf)115 override size_t read(ubyte[] buf) {
116 auto size = min(buf.length, writeOffset_ - readOffset_);
117 buf[0 .. size] = buffer_[readOffset_ .. readOffset_ + size];
118 readOffset_ += size;
119 return size;
120 }
121
122 /**
123 * Shortcut version of readAll() – using this over TBaseTransport.readAll()
124 * can give us a nice speed increase because gives us a nice speed increase
125 * because it is typically a very hot path during deserialization.
126 */
readAll(ubyte[]buf)127 override void readAll(ubyte[] buf) {
128 auto available = writeOffset_ - readOffset_;
129 if (buf.length > available) {
130 throw new TTransportException(text("Cannot readAll() ", buf.length,
131 " bytes of data because only ", available, " bytes are available."),
132 TTransportException.Type.END_OF_FILE);
133 }
134
135 buf[] = buffer_[readOffset_ .. readOffset_ + buf.length];
136 readOffset_ += buf.length;
137 }
138
write(in ubyte[]buf)139 override void write(in ubyte[] buf) {
140 auto need = buf.length;
141 if (bufferLen_ - writeOffset_ < need) {
142 // Exponential growth.
143 auto newLen = bufferLen_ + 1;
144 while (newLen - writeOffset_ < need) newLen *= 2;
145 cRealloc(buffer_, newLen);
146 bufferLen_ = newLen;
147 }
148
149 buffer_[writeOffset_ .. writeOffset_ + need] = buf[];
150 writeOffset_ += need;
151 }
152
borrow(ubyte * buf,size_t len)153 override const(ubyte)[] borrow(ubyte* buf, size_t len) {
154 if (len <= writeOffset_ - readOffset_) {
155 return buffer_[readOffset_ .. writeOffset_];
156 } else {
157 return null;
158 }
159 }
160
consume(size_t len)161 override void consume(size_t len) {
162 readOffset_ += len;
163 }
164
reset()165 void reset() {
166 readOffset_ = 0;
167 writeOffset_ = 0;
168 }
169
reset(size_t capacity)170 void reset(size_t capacity) {
171 readOffset_ = 0;
172 writeOffset_ = 0;
173 if (bufferLen_ < capacity) {
174 cRealloc(buffer_, capacity);
175 bufferLen_ = capacity;
176 }
177 }
178
179 private:
180 ubyte* buffer_;
181 size_t bufferLen_;
182 size_t readOffset_;
183 size_t writeOffset_;
184 }
185
186 private {
cRealloc(ref ubyte * data,size_t newSize)187 void cRealloc(ref ubyte* data, size_t newSize) {
188 auto result = realloc(data, newSize);
189 if (result is null) onOutOfMemoryError();
190 data = cast(ubyte*)result;
191 }
192 }
193
version(unittest)194 version (unittest) {
195 import std.exception;
196 }
197
198 unittest {
199 auto a = new TMemoryBuffer(5);
200 immutable(ubyte[]) testData = [1, 2, 3, 4];
201 auto buf = new ubyte[testData.length];
202 enforce(a.isOpen);
203
204 // a should be empty.
205 enforce(!a.peek());
206 enforce(a.read(buf) == 0);
207 assertThrown!TTransportException(a.readAll(buf));
208
209 // Write some data and read it back again.
210 a.write(testData);
211 enforce(a.peek());
212 enforce(a.getContents() == testData);
213 enforce(a.read(buf) == testData.length);
214 enforce(buf == testData);
215
216 // a should be empty again.
217 enforce(!a.peek());
218 enforce(a.read(buf) == 0);
219 assertThrown!TTransportException(a.readAll(buf));
220
221 // Test the constructor which directly accepts initial data.
222 auto b = new TMemoryBuffer(testData);
223 enforce(b.isOpen);
224 enforce(b.peek());
225 enforce(b.getContents() == testData);
226
227 // Test borrow().
228 auto borrowed = b.borrow(null, testData.length);
229 enforce(borrowed == testData);
230 enforce(b.peek());
231 b.consume(testData.length);
232 enforce(!b.peek());
233 }
234