1 /***********************************************************************************************************************************
2 Remote Storage Read
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #include <fcntl.h>
7 #include <unistd.h>
8
9 #include "common/compress/helper.h"
10 #include "common/debug.h"
11 #include "common/io/read.h"
12 #include "common/log.h"
13 #include "common/memContext.h"
14 #include "common/type/convert.h"
15 #include "common/type/json.h"
16 #include "common/type/object.h"
17 #include "storage/remote/protocol.h"
18 #include "storage/remote/read.h"
19 #include "storage/read.intern.h"
20
21 /***********************************************************************************************************************************
22 Object type
23 ***********************************************************************************************************************************/
24 typedef struct StorageReadRemote
25 {
26 MemContext *memContext; // Object mem context
27 StorageReadInterface interface; // Interface
28 StorageRemote *storage; // Storage that created this object
29 StorageRead *read; // Storage read interface
30
31 ProtocolClient *client; // Protocol client for requests
32 size_t remaining; // Bytes remaining to be read in block
33 Buffer *block; // Block currently being read
34 bool eof; // Has the file reached eof?
35
36 #ifdef DEBUG
37 uint64_t protocolReadBytes; // How many bytes were read from the protocol layer?
38 #endif
39 } StorageReadRemote;
40
41 /***********************************************************************************************************************************
42 Macros for function logging
43 ***********************************************************************************************************************************/
44 #define FUNCTION_LOG_STORAGE_READ_REMOTE_TYPE \
45 StorageReadRemote *
46 #define FUNCTION_LOG_STORAGE_READ_REMOTE_FORMAT(value, buffer, bufferSize) \
47 objToLog(value, "StorageReadRemote", buffer, bufferSize)
48
49 /***********************************************************************************************************************************
50 Open the file
51 ***********************************************************************************************************************************/
52 static bool
storageReadRemoteOpen(THIS_VOID)53 storageReadRemoteOpen(THIS_VOID)
54 {
55 THIS(StorageReadRemote);
56
57 FUNCTION_LOG_BEGIN(logLevelTrace);
58 FUNCTION_LOG_PARAM(STORAGE_READ_REMOTE, this);
59 FUNCTION_LOG_END();
60
61 ASSERT(this != NULL);
62
63 bool result = false;
64
65 MEM_CONTEXT_TEMP_BEGIN()
66 {
67 // If the file is compressible add compression filter on the remote
68 if (this->interface.compressible)
69 {
70 ioFilterGroupAdd(
71 ioReadFilterGroup(storageReadIo(this->read)), compressFilter(compressTypeGz, (int)this->interface.compressLevel));
72 }
73
74 ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ);
75 PackWrite *const param = protocolCommandParam(command);
76
77 pckWriteStrP(param, this->interface.name);
78 pckWriteBoolP(param, this->interface.ignoreMissing);
79 pckWriteStrP(param, jsonFromVar(this->interface.limit));
80 pckWriteStrP(param, jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read)))));
81
82 protocolClientCommandPut(this->client, command);
83
84 // If the file exists
85 result = pckReadBoolP(protocolClientDataGet(this->client));
86
87 if (result)
88 {
89 // Clear filters since they will be run on the remote side
90 ioFilterGroupClear(ioReadFilterGroup(storageReadIo(this->read)));
91
92 // If the file is compressible add decompression filter locally
93 if (this->interface.compressible)
94 ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), decompressFilter(compressTypeGz));
95 }
96 // Else nothing to do
97 else
98 protocolClientDataEndGet(this->client);
99 }
100 MEM_CONTEXT_TEMP_END();
101
102 FUNCTION_LOG_RETURN(BOOL, result);
103 }
104
105 /***********************************************************************************************************************************
106 Read from a file
107 ***********************************************************************************************************************************/
108 static size_t
storageReadRemote(THIS_VOID,Buffer * buffer,bool block)109 storageReadRemote(THIS_VOID, Buffer *buffer, bool block)
110 {
111 THIS(StorageReadRemote);
112
113 FUNCTION_LOG_BEGIN(logLevelTrace);
114 FUNCTION_LOG_PARAM(STORAGE_READ_REMOTE, this);
115 FUNCTION_LOG_PARAM(BUFFER, buffer);
116 FUNCTION_LOG_PARAM(BOOL, block);
117 FUNCTION_LOG_END();
118
119 ASSERT(this != NULL);
120 ASSERT(buffer != NULL && !bufFull(buffer));
121
122 size_t result = 0;
123
124 // Read if eof has not been reached
125 if (!this->eof)
126 {
127 do
128 {
129 // If no bytes remaining then read a new block
130 if (this->remaining == 0)
131 {
132 MEM_CONTEXT_TEMP_BEGIN()
133 {
134 PackRead *const read = protocolClientDataGet(this->client);
135 pckReadNext(read);
136
137 // If binary then read the next block
138 if (pckReadType(read) == pckTypeBin)
139 {
140 MEM_CONTEXT_BEGIN(this->memContext)
141 {
142 this->block = pckReadBinP(read);
143 this->remaining = bufUsed(this->block);
144 }
145 MEM_CONTEXT_END();
146 }
147 // Else read is complete and get the filter list
148 else
149 {
150 bufFree(this->block);
151
152 ioFilterGroupResultAllSet(ioReadFilterGroup(storageReadIo(this->read)), jsonToVar(pckReadStrP(read)));
153 this->eof = true;
154
155 protocolClientDataEndGet(this->client);
156 }
157
158 #ifdef DEBUG
159 this->protocolReadBytes += this->remaining;
160 #endif
161 }
162 MEM_CONTEXT_TEMP_END();
163 }
164
165 // Read if not eof
166 if (!this->eof)
167 {
168 // If the buffer can contain all remaining bytes
169 if (bufRemains(buffer) >= this->remaining)
170 {
171 bufCatSub(buffer, this->block, bufUsed(this->block) - this->remaining, this->remaining);
172
173 this->remaining = 0;
174 bufFree(this->block);
175 this->block = NULL;
176 }
177 // Else read what we can
178 else
179 {
180 size_t remains = bufRemains(buffer);
181 bufCatSub(buffer, this->block, bufUsed(this->block) - this->remaining, remains);
182 this->remaining -= remains;
183 }
184 }
185 }
186 while (!this->eof && !bufFull(buffer));
187 }
188
189 FUNCTION_LOG_RETURN(SIZE, result);
190 }
191
192 /***********************************************************************************************************************************
193 Has file reached EOF?
194 ***********************************************************************************************************************************/
195 static bool
storageReadRemoteEof(THIS_VOID)196 storageReadRemoteEof(THIS_VOID)
197 {
198 THIS(StorageReadRemote);
199
200 FUNCTION_TEST_BEGIN();
201 FUNCTION_TEST_PARAM(STORAGE_READ_REMOTE, this);
202 FUNCTION_TEST_END();
203
204 ASSERT(this != NULL);
205
206 FUNCTION_TEST_RETURN(this->eof);
207 }
208
209 /**********************************************************************************************************************************/
210 StorageRead *
storageReadRemoteNew(StorageRemote * storage,ProtocolClient * client,const String * name,bool ignoreMissing,bool compressible,unsigned int compressLevel,const Variant * limit)211 storageReadRemoteNew(
212 StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
213 unsigned int compressLevel, const Variant *limit)
214 {
215 FUNCTION_LOG_BEGIN(logLevelTrace);
216 FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
217 FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
218 FUNCTION_LOG_PARAM(STRING, name);
219 FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
220 FUNCTION_LOG_PARAM(BOOL, compressible);
221 FUNCTION_LOG_PARAM(UINT, compressLevel);
222 FUNCTION_LOG_PARAM(VARIANT, limit);
223 FUNCTION_LOG_END();
224
225 ASSERT(storage != NULL);
226 ASSERT(client != NULL);
227 ASSERT(name != NULL);
228
229 StorageReadRemote *this = NULL;
230
231 MEM_CONTEXT_NEW_BEGIN("StorageReadRemote")
232 {
233 this = memNew(sizeof(StorageReadRemote));
234
235 *this = (StorageReadRemote)
236 {
237 .memContext = MEM_CONTEXT_NEW(),
238 .storage = storage,
239 .client = client,
240
241 .interface = (StorageReadInterface)
242 {
243 .type = STORAGE_REMOTE_TYPE,
244 .name = strDup(name),
245 .compressible = compressible,
246 .compressLevel = compressLevel,
247 .ignoreMissing = ignoreMissing,
248 .limit = varDup(limit),
249
250 .ioInterface = (IoReadInterface)
251 {
252 .eof = storageReadRemoteEof,
253 .open = storageReadRemoteOpen,
254 .read = storageReadRemote,
255 },
256 },
257 };
258
259 this->read = storageReadNew(this, &this->interface);
260 }
261 MEM_CONTEXT_NEW_END();
262
263 ASSERT(this != NULL);
264 FUNCTION_LOG_RETURN(STORAGE_READ, this->read);
265 }
266