1 /***********************************************************************************************************************************
2 Protocol Server
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5 
6 #include <string.h>
7 
8 #include "common/debug.h"
9 #include "common/log.h"
10 #include "common/memContext.h"
11 #include "common/time.h"
12 #include "common/type/json.h"
13 #include "common/type/keyValue.h"
14 #include "common/type/list.h"
15 #include "protocol/helper.h"
16 #include "protocol/server.h"
17 #include "version.h"
18 
19 /***********************************************************************************************************************************
20 Object type
21 ***********************************************************************************************************************************/
22 struct ProtocolServer
23 {
24     MemContext *memContext;                                         // Mem context
25     IoRead *read;                                                   // Read interface
26     IoWrite *write;                                                 // Write interface
27     const String *name;                                             // Name displayed in logging
28 };
29 
30 /**********************************************************************************************************************************/
31 ProtocolServer *
protocolServerNew(const String * name,const String * service,IoRead * read,IoWrite * write)32 protocolServerNew(const String *name, const String *service, IoRead *read, IoWrite *write)
33 {
34     FUNCTION_LOG_BEGIN(logLevelTrace);
35         FUNCTION_LOG_PARAM(STRING, name);
36         FUNCTION_LOG_PARAM(STRING, service);
37         FUNCTION_LOG_PARAM(IO_READ, read);
38         FUNCTION_LOG_PARAM(IO_WRITE, write);
39     FUNCTION_LOG_END();
40 
41     ASSERT(name != NULL);
42     ASSERT(read != NULL);
43     ASSERT(write != NULL);
44 
45     ProtocolServer *this = NULL;
46 
47     MEM_CONTEXT_NEW_BEGIN("ProtocolServer")
48     {
49         this = memNew(sizeof(ProtocolServer));
50 
51         *this = (ProtocolServer)
52         {
53             .memContext = memContextCurrent(),
54             .read = read,
55             .write = write,
56             .name = strDup(name),
57         };
58 
59         // Send the protocol greeting
60         MEM_CONTEXT_TEMP_BEGIN()
61         {
62             KeyValue *greetingKv = kvNew();
63             kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_NAME_STR), VARSTRZ(PROJECT_NAME));
64             kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_SERVICE_STR), VARSTR(service));
65             kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_VERSION_STR), VARSTRZ(PROJECT_VERSION));
66 
67             ioWriteStrLine(this->write, jsonFromKv(greetingKv));
68             ioWriteFlush(this->write);
69         }
70         MEM_CONTEXT_TEMP_END();
71     }
72     MEM_CONTEXT_NEW_END();
73 
74     FUNCTION_LOG_RETURN(PROTOCOL_SERVER, this);
75 }
76 
77 /**********************************************************************************************************************************/
78 void
protocolServerError(ProtocolServer * this,int code,const String * message,const String * stack)79 protocolServerError(ProtocolServer *this, int code, const String *message, const String *stack)
80 {
81     FUNCTION_LOG_BEGIN(logLevelTrace);
82         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
83         FUNCTION_LOG_PARAM(INT, code);
84         FUNCTION_LOG_PARAM(STRING, message);
85         FUNCTION_LOG_PARAM(STRING, stack);
86     FUNCTION_LOG_END();
87 
88     ASSERT(this != NULL);
89     ASSERT(code != 0);
90     ASSERT(message != NULL);
91     ASSERT(stack != NULL);
92 
93     MEM_CONTEXT_TEMP_BEGIN()
94     {
95         // Write the error and flush to be sure it gets sent immediately
96         PackWrite *error = pckWriteNew(this->write);
97         pckWriteU32P(error, protocolMessageTypeError);
98         pckWriteI32P(error, code);
99         pckWriteStrP(error, message);
100         pckWriteStrP(error, stack);
101         pckWriteEndP(error);
102 
103         ioWriteFlush(this->write);
104     }
105     MEM_CONTEXT_TEMP_END();
106 
107     FUNCTION_LOG_RETURN_VOID();
108 }
109 
110 /**********************************************************************************************************************************/
111 ProtocolServerCommandGetResult
protocolServerCommandGet(ProtocolServer * const this)112 protocolServerCommandGet(ProtocolServer *const this)
113 {
114     FUNCTION_LOG_BEGIN(logLevelTrace);
115         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
116     FUNCTION_LOG_END();
117 
118     ProtocolServerCommandGetResult result = {0};
119 
120     MEM_CONTEXT_TEMP_BEGIN()
121     {
122         PackRead *const command = pckReadNew(this->read);
123         ProtocolMessageType type = (ProtocolMessageType)pckReadU32P(command);
124 
125         CHECK(type == protocolMessageTypeCommand);
126 
127         MEM_CONTEXT_PRIOR_BEGIN()
128         {
129             result.id = pckReadStrIdP(command);
130             result.param = pckReadPackBufP(command);
131         }
132         MEM_CONTEXT_PRIOR_END();
133 
134         pckReadEndP(command);
135     }
136     MEM_CONTEXT_TEMP_END();
137 
138     FUNCTION_LOG_RETURN_STRUCT(result);
139 }
140 
141 /**********************************************************************************************************************************/
142 void
protocolServerProcess(ProtocolServer * this,const VariantList * retryInterval,const ProtocolServerHandler * const handlerList,const unsigned int handlerListSize)143 protocolServerProcess(
144     ProtocolServer *this, const VariantList *retryInterval, const ProtocolServerHandler *const handlerList,
145     const unsigned int handlerListSize)
146 {
147     FUNCTION_LOG_BEGIN(logLevelDebug);
148         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
149         FUNCTION_LOG_PARAM(VARIANT_LIST, retryInterval);
150         FUNCTION_LOG_PARAM_P(VOID, handlerList);
151         FUNCTION_LOG_PARAM(UINT, handlerListSize);
152     FUNCTION_LOG_END();
153 
154     ASSERT(this != NULL);
155     ASSERT(handlerList != NULL);
156     ASSERT(handlerListSize > 0);
157 
158     // Loop until exit command is received
159     bool exit = false;
160 
161     do
162     {
163         TRY_BEGIN()
164         {
165             MEM_CONTEXT_TEMP_BEGIN()
166             {
167                 // Get command
168                 ProtocolServerCommandGetResult command = protocolServerCommandGet(this);
169 
170                 // Find the handler
171                 ProtocolServerCommandHandler handler = NULL;
172 
173                 for (unsigned int handlerIdx = 0; handlerIdx < handlerListSize; handlerIdx++)
174                 {
175                     if (command.id == handlerList[handlerIdx].command)
176                     {
177                         handler = handlerList[handlerIdx].handler;
178                         break;
179                     }
180                 }
181 
182                 // If handler was found then process
183                 if (handler != NULL)
184                 {
185                     // Send the command to the handler.  Run the handler in the server's memory context in case any persistent data
186                     // needs to be stored by the handler.
187                     MEM_CONTEXT_BEGIN(this->memContext)
188                     {
189                         // Initialize retries in case of command failure
190                         bool retry = false;
191                         unsigned int retryRemaining = retryInterval != NULL ? varLstSize(retryInterval) : 0;
192 
193                         // Handler retry loop
194                         do
195                         {
196                             retry = false;
197 
198                             TRY_BEGIN()
199                             {
200                                 handler(pckReadNewBuf(command.param), this);
201                             }
202                             CATCH_ANY()
203                             {
204                                 // Are there retries remaining?
205                                 if (retryRemaining > 0)
206                                 {
207                                     // Get the sleep interval for this retry
208                                     TimeMSec retrySleepMs = varUInt64(
209                                         varLstGet(retryInterval, varLstSize(retryInterval) - retryRemaining));
210 
211                                     // Log the retry
212                                     LOG_DEBUG_FMT(
213                                         "retry %s after %" PRIu64 "ms: %s", errorTypeName(errorType()), retrySleepMs,
214                                         errorMessage());
215 
216                                     // Sleep for interval
217                                     sleepMSec(retrySleepMs);
218 
219                                     // Decrement retries remaining and retry
220                                     retryRemaining--;
221                                     retry = true;
222 
223                                     // Send keep alives to remotes. A retry means the command is taking longer than usual so make
224                                     // sure the remote does not timeout.
225                                     protocolKeepAlive();
226                                 }
227                                 // Else report error to the client
228                                 else
229                                     protocolServerError(this, errorCode(), STR(errorMessage()), STR(errorStackTrace()));
230                             }
231                             TRY_END();
232                         }
233                         while (retry);
234                     }
235                     MEM_CONTEXT_END();
236                 }
237                 // Else check built-in commands
238                 else
239                 {
240                     switch (command.id)
241                     {
242                         case PROTOCOL_COMMAND_EXIT:
243                             exit = true;
244                             break;
245 
246                         case PROTOCOL_COMMAND_NOOP:
247                             protocolServerDataEndPut(this);
248                             break;
249 
250                         default:
251                             THROW_FMT(
252                                 ProtocolError, "invalid command '%s' (0x%" PRIx64 ")", strZ(strIdToStr(command.id)), command.id);
253                     }
254                 }
255 
256                 // Send keep alives to remotes.  When a local process is doing work that does not involve the remote it is important
257                 // that the remote does not timeout.  This will send a keep alive once per unit of work that is performed by the
258                 // local process.
259                 protocolKeepAlive();
260             }
261             MEM_CONTEXT_TEMP_END();
262         }
263         CATCH_ANY()
264         {
265             // Report error to the client
266             protocolServerError(this, errorCode(), STR(errorMessage()), STR(errorStackTrace()));
267 
268             // Rethrow so the process exits with an error
269             RETHROW();
270         }
271         TRY_END();
272     }
273     while (!exit);
274 
275     FUNCTION_LOG_RETURN_VOID();
276 }
277 
278 /**********************************************************************************************************************************/
279 PackRead *
protocolServerDataGet(ProtocolServer * const this)280 protocolServerDataGet(ProtocolServer *const this)
281 {
282     FUNCTION_LOG_BEGIN(logLevelTrace);
283         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
284     FUNCTION_LOG_END();
285 
286     PackRead *result = NULL;
287 
288     MEM_CONTEXT_TEMP_BEGIN()
289     {
290         PackRead *data = pckReadNew(this->read);
291         ProtocolMessageType type = (ProtocolMessageType)pckReadU32P(data);
292 
293         CHECK(type == protocolMessageTypeData);
294 
295         MEM_CONTEXT_PRIOR_BEGIN()
296         {
297             result = pckReadPackP(data);
298         }
299         MEM_CONTEXT_PRIOR_END();
300 
301         pckReadEndP(data);
302     }
303     MEM_CONTEXT_TEMP_END();
304 
305     FUNCTION_LOG_RETURN(PACK_READ, result);
306 }
307 
308 /**********************************************************************************************************************************/
309 void
protocolServerDataPut(ProtocolServer * const this,PackWrite * const data)310 protocolServerDataPut(ProtocolServer *const this, PackWrite *const data)
311 {
312     FUNCTION_LOG_BEGIN(logLevelTrace);
313         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
314         FUNCTION_LOG_PARAM(PACK_WRITE, data);
315     FUNCTION_LOG_END();
316 
317     MEM_CONTEXT_TEMP_BEGIN()
318     {
319         // End the pack
320         if (data != NULL)
321             pckWriteEndP(data);
322 
323         // Write the result
324         PackWrite *resultMessage = pckWriteNew(this->write);
325         pckWriteU32P(resultMessage, protocolMessageTypeData, .defaultWrite = true);
326         pckWritePackP(resultMessage, data);
327         pckWriteEndP(resultMessage);
328 
329         // Flush on NULL result since it might be used to synchronize
330         if (data == NULL)
331             ioWriteFlush(this->write);
332     }
333     MEM_CONTEXT_TEMP_END();
334 
335     FUNCTION_LOG_RETURN_VOID();
336 }
337 
338 /**********************************************************************************************************************************/
339 void
protocolServerDataEndPut(ProtocolServer * const this)340 protocolServerDataEndPut(ProtocolServer *const this)
341 {
342     FUNCTION_LOG_BEGIN(logLevelTrace);
343         FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
344     FUNCTION_LOG_END();
345 
346     MEM_CONTEXT_TEMP_BEGIN()
347     {
348         // Write the response and flush to be sure it gets sent immediately
349         PackWrite *response = pckWriteNew(this->write);
350         pckWriteU32P(response, protocolMessageTypeDataEnd, .defaultWrite = true);
351         pckWriteEndP(response);
352     }
353     MEM_CONTEXT_TEMP_END();
354 
355     ioWriteFlush(this->write);
356 
357     FUNCTION_LOG_RETURN_VOID();
358 }
359 
360 /**********************************************************************************************************************************/
361 String *
protocolServerToLog(const ProtocolServer * this)362 protocolServerToLog(const ProtocolServer *this)
363 {
364     return strNewFmt("{name: %s}", strZ(this->name));
365 }
366