1 /***********************************************************************************************************************************
2 Protocol Server
3 ***********************************************************************************************************************************/
4 #include "build.auto.h"
5
6 #include <string.h>
7
8 #include "common/debug.h"
9 #include "common/log.h"
10 #include "common/memContext.h"
11 #include "common/time.h"
12 #include "common/type/json.h"
13 #include "common/type/keyValue.h"
14 #include "common/type/list.h"
15 #include "protocol/helper.h"
16 #include "protocol/server.h"
17 #include "version.h"
18
19 /***********************************************************************************************************************************
20 Object type
21 ***********************************************************************************************************************************/
22 struct ProtocolServer
23 {
24 MemContext *memContext; // Mem context
25 IoRead *read; // Read interface
26 IoWrite *write; // Write interface
27 const String *name; // Name displayed in logging
28 };
29
30 /**********************************************************************************************************************************/
31 ProtocolServer *
protocolServerNew(const String * name,const String * service,IoRead * read,IoWrite * write)32 protocolServerNew(const String *name, const String *service, IoRead *read, IoWrite *write)
33 {
34 FUNCTION_LOG_BEGIN(logLevelTrace);
35 FUNCTION_LOG_PARAM(STRING, name);
36 FUNCTION_LOG_PARAM(STRING, service);
37 FUNCTION_LOG_PARAM(IO_READ, read);
38 FUNCTION_LOG_PARAM(IO_WRITE, write);
39 FUNCTION_LOG_END();
40
41 ASSERT(name != NULL);
42 ASSERT(read != NULL);
43 ASSERT(write != NULL);
44
45 ProtocolServer *this = NULL;
46
47 MEM_CONTEXT_NEW_BEGIN("ProtocolServer")
48 {
49 this = memNew(sizeof(ProtocolServer));
50
51 *this = (ProtocolServer)
52 {
53 .memContext = memContextCurrent(),
54 .read = read,
55 .write = write,
56 .name = strDup(name),
57 };
58
59 // Send the protocol greeting
60 MEM_CONTEXT_TEMP_BEGIN()
61 {
62 KeyValue *greetingKv = kvNew();
63 kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_NAME_STR), VARSTRZ(PROJECT_NAME));
64 kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_SERVICE_STR), VARSTR(service));
65 kvPut(greetingKv, VARSTR(PROTOCOL_GREETING_VERSION_STR), VARSTRZ(PROJECT_VERSION));
66
67 ioWriteStrLine(this->write, jsonFromKv(greetingKv));
68 ioWriteFlush(this->write);
69 }
70 MEM_CONTEXT_TEMP_END();
71 }
72 MEM_CONTEXT_NEW_END();
73
74 FUNCTION_LOG_RETURN(PROTOCOL_SERVER, this);
75 }
76
77 /**********************************************************************************************************************************/
78 void
protocolServerError(ProtocolServer * this,int code,const String * message,const String * stack)79 protocolServerError(ProtocolServer *this, int code, const String *message, const String *stack)
80 {
81 FUNCTION_LOG_BEGIN(logLevelTrace);
82 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
83 FUNCTION_LOG_PARAM(INT, code);
84 FUNCTION_LOG_PARAM(STRING, message);
85 FUNCTION_LOG_PARAM(STRING, stack);
86 FUNCTION_LOG_END();
87
88 ASSERT(this != NULL);
89 ASSERT(code != 0);
90 ASSERT(message != NULL);
91 ASSERT(stack != NULL);
92
93 MEM_CONTEXT_TEMP_BEGIN()
94 {
95 // Write the error and flush to be sure it gets sent immediately
96 PackWrite *error = pckWriteNew(this->write);
97 pckWriteU32P(error, protocolMessageTypeError);
98 pckWriteI32P(error, code);
99 pckWriteStrP(error, message);
100 pckWriteStrP(error, stack);
101 pckWriteEndP(error);
102
103 ioWriteFlush(this->write);
104 }
105 MEM_CONTEXT_TEMP_END();
106
107 FUNCTION_LOG_RETURN_VOID();
108 }
109
110 /**********************************************************************************************************************************/
111 ProtocolServerCommandGetResult
protocolServerCommandGet(ProtocolServer * const this)112 protocolServerCommandGet(ProtocolServer *const this)
113 {
114 FUNCTION_LOG_BEGIN(logLevelTrace);
115 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
116 FUNCTION_LOG_END();
117
118 ProtocolServerCommandGetResult result = {0};
119
120 MEM_CONTEXT_TEMP_BEGIN()
121 {
122 PackRead *const command = pckReadNew(this->read);
123 ProtocolMessageType type = (ProtocolMessageType)pckReadU32P(command);
124
125 CHECK(type == protocolMessageTypeCommand);
126
127 MEM_CONTEXT_PRIOR_BEGIN()
128 {
129 result.id = pckReadStrIdP(command);
130 result.param = pckReadPackBufP(command);
131 }
132 MEM_CONTEXT_PRIOR_END();
133
134 pckReadEndP(command);
135 }
136 MEM_CONTEXT_TEMP_END();
137
138 FUNCTION_LOG_RETURN_STRUCT(result);
139 }
140
141 /**********************************************************************************************************************************/
142 void
protocolServerProcess(ProtocolServer * this,const VariantList * retryInterval,const ProtocolServerHandler * const handlerList,const unsigned int handlerListSize)143 protocolServerProcess(
144 ProtocolServer *this, const VariantList *retryInterval, const ProtocolServerHandler *const handlerList,
145 const unsigned int handlerListSize)
146 {
147 FUNCTION_LOG_BEGIN(logLevelDebug);
148 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
149 FUNCTION_LOG_PARAM(VARIANT_LIST, retryInterval);
150 FUNCTION_LOG_PARAM_P(VOID, handlerList);
151 FUNCTION_LOG_PARAM(UINT, handlerListSize);
152 FUNCTION_LOG_END();
153
154 ASSERT(this != NULL);
155 ASSERT(handlerList != NULL);
156 ASSERT(handlerListSize > 0);
157
158 // Loop until exit command is received
159 bool exit = false;
160
161 do
162 {
163 TRY_BEGIN()
164 {
165 MEM_CONTEXT_TEMP_BEGIN()
166 {
167 // Get command
168 ProtocolServerCommandGetResult command = protocolServerCommandGet(this);
169
170 // Find the handler
171 ProtocolServerCommandHandler handler = NULL;
172
173 for (unsigned int handlerIdx = 0; handlerIdx < handlerListSize; handlerIdx++)
174 {
175 if (command.id == handlerList[handlerIdx].command)
176 {
177 handler = handlerList[handlerIdx].handler;
178 break;
179 }
180 }
181
182 // If handler was found then process
183 if (handler != NULL)
184 {
185 // Send the command to the handler. Run the handler in the server's memory context in case any persistent data
186 // needs to be stored by the handler.
187 MEM_CONTEXT_BEGIN(this->memContext)
188 {
189 // Initialize retries in case of command failure
190 bool retry = false;
191 unsigned int retryRemaining = retryInterval != NULL ? varLstSize(retryInterval) : 0;
192
193 // Handler retry loop
194 do
195 {
196 retry = false;
197
198 TRY_BEGIN()
199 {
200 handler(pckReadNewBuf(command.param), this);
201 }
202 CATCH_ANY()
203 {
204 // Are there retries remaining?
205 if (retryRemaining > 0)
206 {
207 // Get the sleep interval for this retry
208 TimeMSec retrySleepMs = varUInt64(
209 varLstGet(retryInterval, varLstSize(retryInterval) - retryRemaining));
210
211 // Log the retry
212 LOG_DEBUG_FMT(
213 "retry %s after %" PRIu64 "ms: %s", errorTypeName(errorType()), retrySleepMs,
214 errorMessage());
215
216 // Sleep for interval
217 sleepMSec(retrySleepMs);
218
219 // Decrement retries remaining and retry
220 retryRemaining--;
221 retry = true;
222
223 // Send keep alives to remotes. A retry means the command is taking longer than usual so make
224 // sure the remote does not timeout.
225 protocolKeepAlive();
226 }
227 // Else report error to the client
228 else
229 protocolServerError(this, errorCode(), STR(errorMessage()), STR(errorStackTrace()));
230 }
231 TRY_END();
232 }
233 while (retry);
234 }
235 MEM_CONTEXT_END();
236 }
237 // Else check built-in commands
238 else
239 {
240 switch (command.id)
241 {
242 case PROTOCOL_COMMAND_EXIT:
243 exit = true;
244 break;
245
246 case PROTOCOL_COMMAND_NOOP:
247 protocolServerDataEndPut(this);
248 break;
249
250 default:
251 THROW_FMT(
252 ProtocolError, "invalid command '%s' (0x%" PRIx64 ")", strZ(strIdToStr(command.id)), command.id);
253 }
254 }
255
256 // Send keep alives to remotes. When a local process is doing work that does not involve the remote it is important
257 // that the remote does not timeout. This will send a keep alive once per unit of work that is performed by the
258 // local process.
259 protocolKeepAlive();
260 }
261 MEM_CONTEXT_TEMP_END();
262 }
263 CATCH_ANY()
264 {
265 // Report error to the client
266 protocolServerError(this, errorCode(), STR(errorMessage()), STR(errorStackTrace()));
267
268 // Rethrow so the process exits with an error
269 RETHROW();
270 }
271 TRY_END();
272 }
273 while (!exit);
274
275 FUNCTION_LOG_RETURN_VOID();
276 }
277
278 /**********************************************************************************************************************************/
279 PackRead *
protocolServerDataGet(ProtocolServer * const this)280 protocolServerDataGet(ProtocolServer *const this)
281 {
282 FUNCTION_LOG_BEGIN(logLevelTrace);
283 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
284 FUNCTION_LOG_END();
285
286 PackRead *result = NULL;
287
288 MEM_CONTEXT_TEMP_BEGIN()
289 {
290 PackRead *data = pckReadNew(this->read);
291 ProtocolMessageType type = (ProtocolMessageType)pckReadU32P(data);
292
293 CHECK(type == protocolMessageTypeData);
294
295 MEM_CONTEXT_PRIOR_BEGIN()
296 {
297 result = pckReadPackP(data);
298 }
299 MEM_CONTEXT_PRIOR_END();
300
301 pckReadEndP(data);
302 }
303 MEM_CONTEXT_TEMP_END();
304
305 FUNCTION_LOG_RETURN(PACK_READ, result);
306 }
307
308 /**********************************************************************************************************************************/
309 void
protocolServerDataPut(ProtocolServer * const this,PackWrite * const data)310 protocolServerDataPut(ProtocolServer *const this, PackWrite *const data)
311 {
312 FUNCTION_LOG_BEGIN(logLevelTrace);
313 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
314 FUNCTION_LOG_PARAM(PACK_WRITE, data);
315 FUNCTION_LOG_END();
316
317 MEM_CONTEXT_TEMP_BEGIN()
318 {
319 // End the pack
320 if (data != NULL)
321 pckWriteEndP(data);
322
323 // Write the result
324 PackWrite *resultMessage = pckWriteNew(this->write);
325 pckWriteU32P(resultMessage, protocolMessageTypeData, .defaultWrite = true);
326 pckWritePackP(resultMessage, data);
327 pckWriteEndP(resultMessage);
328
329 // Flush on NULL result since it might be used to synchronize
330 if (data == NULL)
331 ioWriteFlush(this->write);
332 }
333 MEM_CONTEXT_TEMP_END();
334
335 FUNCTION_LOG_RETURN_VOID();
336 }
337
338 /**********************************************************************************************************************************/
339 void
protocolServerDataEndPut(ProtocolServer * const this)340 protocolServerDataEndPut(ProtocolServer *const this)
341 {
342 FUNCTION_LOG_BEGIN(logLevelTrace);
343 FUNCTION_LOG_PARAM(PROTOCOL_SERVER, this);
344 FUNCTION_LOG_END();
345
346 MEM_CONTEXT_TEMP_BEGIN()
347 {
348 // Write the response and flush to be sure it gets sent immediately
349 PackWrite *response = pckWriteNew(this->write);
350 pckWriteU32P(response, protocolMessageTypeDataEnd, .defaultWrite = true);
351 pckWriteEndP(response);
352 }
353 MEM_CONTEXT_TEMP_END();
354
355 ioWriteFlush(this->write);
356
357 FUNCTION_LOG_RETURN_VOID();
358 }
359
360 /**********************************************************************************************************************************/
361 String *
protocolServerToLog(const ProtocolServer * this)362 protocolServerToLog(const ProtocolServer *this)
363 {
364 return strNewFmt("{name: %s}", strZ(this->name));
365 }
366