1 /***********************************************************************************************************************************
2 Remote Storage Read
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <fcntl.h>
7 #include <unistd.h>
8 
9 #include "common/compress/helper.h"
10 #include "common/debug.h"
11 #include "common/io/read.h"
12 #include "common/log.h"
13 #include "common/memContext.h"
14 #include "common/type/convert.h"
15 #include "common/type/json.h"
16 #include "common/type/object.h"
17 #include "storage/remote/protocol.h"
18 #include "storage/remote/read.h"
19 #include "storage/read.intern.h"
20 
21 /***********************************************************************************************************************************
22 Object type
23 ***********************************************************************************************************************************/
24 typedef struct StorageReadRemote
25 {
26     MemContext *memContext;                                         // Object mem context
27     StorageReadInterface interface;                                 // Interface
28     StorageRemote *storage;                                         // Storage that created this object
29     StorageRead *read;                                              // Storage read interface
30 
31     ProtocolClient *client;                                         // Protocol client for requests
32     size_t remaining;                                               // Bytes remaining to be read in block
33     Buffer *block;                                                  // Block currently being read
34     bool eof;                                                       // Has the file reached eof?
35 
36 #ifdef DEBUG
37     uint64_t protocolReadBytes;                                     // How many bytes were read from the protocol layer?
38 #endif
39 } StorageReadRemote;
40 
41 /***********************************************************************************************************************************
42 Macros for function logging
43 ***********************************************************************************************************************************/
44 #define FUNCTION_LOG_STORAGE_READ_REMOTE_TYPE                                                                                      \
45     StorageReadRemote *
46 #define FUNCTION_LOG_STORAGE_READ_REMOTE_FORMAT(value, buffer, bufferSize)                                                         \
47     objToLog(value, "StorageReadRemote", buffer, bufferSize)
48 
49 /***********************************************************************************************************************************
50 Open the file
51 ***********************************************************************************************************************************/
52 static bool
storageReadRemoteOpen(THIS_VOID)53 storageReadRemoteOpen(THIS_VOID)
54 {
55     THIS(StorageReadRemote);
56 
57     FUNCTION_LOG_BEGIN(logLevelTrace);
58         FUNCTION_LOG_PARAM(STORAGE_READ_REMOTE, this);
59     FUNCTION_LOG_END();
60 
61     ASSERT(this != NULL);
62 
63     bool result = false;
64 
65     MEM_CONTEXT_TEMP_BEGIN()
66     {
67         // If the file is compressible add compression filter on the remote
68         if (this->interface.compressible)
69         {
70             ioFilterGroupAdd(
71                 ioReadFilterGroup(storageReadIo(this->read)), compressFilter(compressTypeGz, (int)this->interface.compressLevel));
72         }
73 
74         ProtocolCommand *command = protocolCommandNew(PROTOCOL_COMMAND_STORAGE_OPEN_READ);
75         PackWrite *const param = protocolCommandParam(command);
76 
77         pckWriteStrP(param, this->interface.name);
78         pckWriteBoolP(param, this->interface.ignoreMissing);
79         pckWriteStrP(param, jsonFromVar(this->interface.limit));
80         pckWriteStrP(param, jsonFromVar(ioFilterGroupParamAll(ioReadFilterGroup(storageReadIo(this->read)))));
81 
82         protocolClientCommandPut(this->client, command);
83 
84         // If the file exists
85         result = pckReadBoolP(protocolClientDataGet(this->client));
86 
87         if (result)
88         {
89             // Clear filters since they will be run on the remote side
90             ioFilterGroupClear(ioReadFilterGroup(storageReadIo(this->read)));
91 
92             // If the file is compressible add decompression filter locally
93             if (this->interface.compressible)
94                 ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(this->read)), decompressFilter(compressTypeGz));
95         }
96         // Else nothing to do
97         else
98             protocolClientDataEndGet(this->client);
99     }
100     MEM_CONTEXT_TEMP_END();
101 
102     FUNCTION_LOG_RETURN(BOOL, result);
103 }
104 
105 /***********************************************************************************************************************************
106 Read from a file
107 ***********************************************************************************************************************************/
108 static size_t
storageReadRemote(THIS_VOID,Buffer * buffer,bool block)109 storageReadRemote(THIS_VOID, Buffer *buffer, bool block)
110 {
111     THIS(StorageReadRemote);
112 
113     FUNCTION_LOG_BEGIN(logLevelTrace);
114         FUNCTION_LOG_PARAM(STORAGE_READ_REMOTE, this);
115         FUNCTION_LOG_PARAM(BUFFER, buffer);
116         FUNCTION_LOG_PARAM(BOOL, block);
117     FUNCTION_LOG_END();
118 
119     ASSERT(this != NULL);
120     ASSERT(buffer != NULL && !bufFull(buffer));
121 
122     size_t result = 0;
123 
124     // Read if eof has not been reached
125     if (!this->eof)
126     {
127         do
128         {
129             // If no bytes remaining then read a new block
130             if (this->remaining == 0)
131             {
132                 MEM_CONTEXT_TEMP_BEGIN()
133                 {
134                     PackRead *const read = protocolClientDataGet(this->client);
135                     pckReadNext(read);
136 
137                     // If binary then read the next block
138                     if (pckReadType(read) == pckTypeBin)
139                     {
140                         MEM_CONTEXT_BEGIN(this->memContext)
141                         {
142                             this->block = pckReadBinP(read);
143                             this->remaining = bufUsed(this->block);
144                         }
145                         MEM_CONTEXT_END();
146                     }
147                     // Else read is complete and get the filter list
148                     else
149                     {
150                         bufFree(this->block);
151 
152                         ioFilterGroupResultAllSet(ioReadFilterGroup(storageReadIo(this->read)), jsonToVar(pckReadStrP(read)));
153                         this->eof = true;
154 
155                         protocolClientDataEndGet(this->client);
156                     }
157 
158 #ifdef DEBUG
159                     this->protocolReadBytes += this->remaining;
160 #endif
161                 }
162                 MEM_CONTEXT_TEMP_END();
163             }
164 
165             // Read if not eof
166             if (!this->eof)
167             {
168                 // If the buffer can contain all remaining bytes
169                 if (bufRemains(buffer) >= this->remaining)
170                 {
171                     bufCatSub(buffer, this->block, bufUsed(this->block) - this->remaining, this->remaining);
172 
173                     this->remaining = 0;
174                     bufFree(this->block);
175                     this->block = NULL;
176                 }
177                 // Else read what we can
178                 else
179                 {
180                     size_t remains = bufRemains(buffer);
181                     bufCatSub(buffer, this->block, bufUsed(this->block) - this->remaining, remains);
182                     this->remaining -= remains;
183                 }
184             }
185         }
186         while (!this->eof && !bufFull(buffer));
187     }
188 
189     FUNCTION_LOG_RETURN(SIZE, result);
190 }
191 
192 /***********************************************************************************************************************************
193 Has file reached EOF?
194 ***********************************************************************************************************************************/
195 static bool
storageReadRemoteEof(THIS_VOID)196 storageReadRemoteEof(THIS_VOID)
197 {
198     THIS(StorageReadRemote);
199 
200     FUNCTION_TEST_BEGIN();
201         FUNCTION_TEST_PARAM(STORAGE_READ_REMOTE, this);
202     FUNCTION_TEST_END();
203 
204     ASSERT(this != NULL);
205 
206     FUNCTION_TEST_RETURN(this->eof);
207 }
208 
209 /**********************************************************************************************************************************/
210 StorageRead *
storageReadRemoteNew(StorageRemote * storage,ProtocolClient * client,const String * name,bool ignoreMissing,bool compressible,unsigned int compressLevel,const Variant * limit)211 storageReadRemoteNew(
212     StorageRemote *storage, ProtocolClient *client, const String *name, bool ignoreMissing, bool compressible,
213     unsigned int compressLevel, const Variant *limit)
214 {
215     FUNCTION_LOG_BEGIN(logLevelTrace);
216         FUNCTION_LOG_PARAM(STORAGE_REMOTE, storage);
217         FUNCTION_LOG_PARAM(PROTOCOL_CLIENT, client);
218         FUNCTION_LOG_PARAM(STRING, name);
219         FUNCTION_LOG_PARAM(BOOL, ignoreMissing);
220         FUNCTION_LOG_PARAM(BOOL, compressible);
221         FUNCTION_LOG_PARAM(UINT, compressLevel);
222         FUNCTION_LOG_PARAM(VARIANT, limit);
223     FUNCTION_LOG_END();
224 
225     ASSERT(storage != NULL);
226     ASSERT(client != NULL);
227     ASSERT(name != NULL);
228 
229     StorageReadRemote *this = NULL;
230 
231     MEM_CONTEXT_NEW_BEGIN("StorageReadRemote")
232     {
233         this = memNew(sizeof(StorageReadRemote));
234 
235         *this = (StorageReadRemote)
236         {
237             .memContext = MEM_CONTEXT_NEW(),
238             .storage = storage,
239             .client = client,
240 
241             .interface = (StorageReadInterface)
242             {
243                 .type = STORAGE_REMOTE_TYPE,
244                 .name = strDup(name),
245                 .compressible = compressible,
246                 .compressLevel = compressLevel,
247                 .ignoreMissing = ignoreMissing,
248                 .limit = varDup(limit),
249 
250                 .ioInterface = (IoReadInterface)
251                 {
252                     .eof = storageReadRemoteEof,
253                     .open = storageReadRemoteOpen,
254                     .read = storageReadRemote,
255                 },
256             },
257         };
258 
259         this->read = storageReadNew(this, &this->interface);
260     }
261     MEM_CONTEXT_NEW_END();
262 
263     ASSERT(this != NULL);
264     FUNCTION_LOG_RETURN(STORAGE_READ, this->read);
265 }
266