1 #include "SQLiteServerLoggerPlugin.h"
2 #include "RakPeerInterface.h"
3 #include "PacketizedTCP.h"
4 #include "MessageIdentifiers.h"
5 #include "SQLiteLoggerCommon.h"
6 #include "jpeglib.h"
7 #include "jpeg_memory_dest.h"
8 #include "FileOperations.h"
9 #include "GetTime.h"
10 #include <time.h>
11 #include <stdio.h>
12 #include <sys/types.h>
13 #include <sys/timeb.h>
14 #include "DXTCompressor.h"
15
16 // http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
17
18 // See jmorecfg.h
19 /*
20 * Ordering of RGB data in scanlines passed to or from the application.
21 * If your application wants to deal with data in the order B,G,R, just
22 * change these macros. You can also deal with formats such as R,G,B,X
23 * (one extra byte per pixel) by changing RGB_PIXELSIZE. Note that changing
24 * the offsets will also change the order in which colormap data is organized.
25 * RESTRICTIONS:
26 * 1. The sample applications cjpeg,djpeg do NOT support modified RGB formats.
27 * 2. These macros only affect RGB<=>YCbCr color conversion, so they are not
28 * useful if you are using JPEG color spaces other than YCbCr or grayscale.
29 * 3. The color quantizer modules will not behave desirably if RGB_PIXELSIZE
30 * is not 3 (they don't understand about dummy color components!). So you
31 * can't use color quantization if you change that value.
32 */
33
34 //#define RGB_RED 0 /* Offset of Red in an RGB scanline element */
35 //#define RGB_GREEN 1 /* Offset of Green */
36 //#define RGB_BLUE 2 /* Offset of Blue */
37 //#define RGB_PIXELSIZE 3 /* JSAMPLEs per RGB scanline element */
38
39 using namespace RakNet;
40
41 // JPEG ENCODING ERRORS
42 struct my_error_mgr {
43 struct jpeg_error_mgr pub;
44 };
my_error_exit(j_common_ptr cinfo)45 METHODDEF(void) my_error_exit (j_common_ptr cinfo) {}
46
47 #define FILE_COLUMN "SourceFile"
48 #define LINE_COLUMN "SourceLine"
49 #define TICK_COUNT_COLUMN "AutoTickCount"
50 #define AUTO_IP_COLUMN "AutoIPAddress"
51 #define TIMESTAMP_NUMERIC_COLUMN "TimestampNumeric"
52 #define TIMESTAMP_TEXT_COLUMN "TimestampText"
53 #define FUNCTION_CALL_FRIENDLY_TEXT "FunctionCallFriendlyText"
54 #define FUNCTION_CALL_TABLE "'functionCalls'"
55 #define FUNCTION_CALL_PARAMETERS_TABLE "'functionCallParameters'"
56
57 // Store packets in the CPUThreadInput until at most this much time has elapsed. This is so
58 // batch processing can occur on multiple image sources at once
59 static const RakNetTime MAX_TIME_TO_BUFFER_PACKETS=1000;
60
61 // WTF am I getting this?
62 // 2>SQLiteServerLoggerPlugin.obj : error LNK2019: unresolved external symbol _GetSqlDataTypeName referenced in function "struct RakNet::SQLite3ServerPlugin::SQLExecThreadOutput __cdecl ExecSQLLoggingThread(struct RakNet::SQLite3ServerPlugin::SQLExecThreadInput,bool *,void *)" (?ExecSQLLoggingThread@@YA?AUExecThreadOutput@SQLite3ServerPlugin@RakNet@@UExecThreadInput@23@PA_NPAX@Z)
63 // 2>C:\RakNet\Debug\SQLiteServerLogger.exe : fatal error LNK1120: 1 unresolved externals
64 static const char *sqlDataTypeNames[SQLLPDT_COUNT] =
65 {
66 "INTEGER",
67 "INTEGER",
68 "NUMERIC",
69 "TEXT",
70 "BLOB",
71 "BLOB",
72 };
GetSqlDataTypeName2(SQLLoggerPrimaryDataType idx)73 const char *GetSqlDataTypeName2(SQLLoggerPrimaryDataType idx) {return sqlDataTypeNames[(int)idx];}
74
CompressAsJpeg(char ** cptrInOut,uint32_t * sizeInOut,uint16_t imageWidth,uint16_t imageHeight,int16_t linePitch,unsigned char input_components)75 void CompressAsJpeg(char **cptrInOut, uint32_t *sizeInOut, uint16_t imageWidth, uint16_t imageHeight, int16_t linePitch, unsigned char input_components)
76 {
77 RakNetTimeUS t1=RakNet::GetTimeUS();
78
79 // Compress to jpg
80 // http://www.google.com/codesearch/p?hl=en#I-_InJ6STRE/gthumb-1.108/libgthumb/pixbuf-utils.c&q=jpeg_create_compress
81 // http://ftp.gnome.org / pub/ GNOME / sources /gthumb /1.108/ gthumb-1.108.tar.gz/
82
83 struct jpeg_compress_struct cinfo;
84 struct my_error_mgr jerr;
85 cinfo.err = jpeg_std_error(&jerr.pub);
86 jerr.pub.error_exit = my_error_exit;
87 jpeg_create_compress(&cinfo);
88 cinfo.smoothing_factor = 0;
89 cinfo.optimize_coding = false;
90 cinfo.image_width = imageWidth;
91 cinfo.image_height = imageHeight;
92 cinfo.input_components = input_components;
93 cinfo.in_color_space = JCS_RGB;
94
95
96 jpeg_set_defaults (&cinfo);
97 cinfo.dct_method=JDCT_FLOAT;
98 // Not sure why they made RGB_PIXELSIZE a global define. I added this so it only uses my value in this one place, to not break other things
99 cinfo.hack_use_input_components_as_RGB_PIXELSIZE=1;
100 jpeg_set_quality (&cinfo, 75, TRUE);
101 int jpegSizeAfterCompression = 0; //size of jpeg after compression
102 char * storage = (char*) rakMalloc_Ex(*sizeInOut+50000,__FILE__,__LINE__);
103 jpeg_memory_dest(&cinfo,(JOCTET*)storage,*sizeInOut+50000,&jpegSizeAfterCompression);
104
105 JSAMPROW row_pointer[1];
106 jpeg_start_compress (&cinfo, TRUE);
107 while( cinfo.next_scanline < cinfo.image_height )
108 {
109 row_pointer[0] = (JSAMPROW) &((*cptrInOut)[cinfo.next_scanline * linePitch ]);
110 jpeg_write_scanlines(&cinfo, row_pointer, 1);
111
112 }
113
114 /* finish off */
115 jpeg_finish_compress (&cinfo);
116 jpeg_destroy_compress(&cinfo);
117
118 rakFree_Ex(*cptrInOut,__FILE__,__LINE__);
119 *cptrInOut = (char*) rakRealloc_Ex(storage, jpegSizeAfterCompression,__FILE__,__LINE__);
120 *sizeInOut=jpegSizeAfterCompression;
121
122 RakNetTimeUS t2=RakNet::GetTimeUS();
123 RakNetTimeUS diff=t2-t1;
124 }
125 static bool needsDxtInit=true;
126 static bool dxtCompressionSupported=false;
InitDxt()127 void* InitDxt()
128 {
129 if (needsDxtInit==true)
130 {
131 dxtCompressionSupported=DXTCompressor::Initialize();
132 if (dxtCompressionSupported==false)
133 {
134 printf("Warning, DXT compressor failed to start.\nImages will be compressed with jpg instead.\n");
135 }
136 }
137 needsDxtInit=false;
138 return 0;
139 }
DeinitDxt(void *)140 void DeinitDxt(void*)
141 {
142 if (dxtCompressionSupported)
143 DXTCompressor::Shutdown();
144 needsDxtInit=true;
145 dxtCompressionSupported=false;
146 }
ExecCPULoggingThread(SQLiteServerLoggerPlugin::CPUThreadInput * cpuThreadInput,bool * returnOutput,void * perThreadData)147 SQLiteServerLoggerPlugin::CPUThreadOutput* ExecCPULoggingThread(SQLiteServerLoggerPlugin::CPUThreadInput* cpuThreadInput, bool *returnOutput, void* perThreadData)
148 {
149 int i;
150 *returnOutput=true;
151 SQLiteServerLoggerPlugin::CPUThreadOutput *cpuThreadOutput = RakNet::OP_NEW<SQLiteServerLoggerPlugin::CPUThreadOutput>(__FILE__,__LINE__);
152 cpuThreadOutput->arraySize=cpuThreadInput->arraySize;
153 //cpuThreadOutput->cpuOutputNodeArray=RakNet::OP_NEW_ARRAY<SQLiteServerLoggerPlugin::CPUThreadOutputNode*>(cpuThreadInput->arraySize,__FILE__,__LINE__);
154 //printf("1. arraySize=%i, ",cpuThreadInput->arraySize);
155 for (i=0; i<cpuThreadInput->arraySize; i++)
156 {
157 cpuThreadOutput->cpuOutputNodeArray[i]=RakNet::OP_NEW<SQLiteServerLoggerPlugin::CPUThreadOutputNode>(__FILE__,__LINE__);
158 SQLiteServerLoggerPlugin::CPUThreadOutputNode *outputNode = cpuThreadOutput->cpuOutputNodeArray[i];
159 Packet *packet = cpuThreadInput->cpuInputArray[i].packet;
160 RakNet::RakString dbIdentifier = cpuThreadInput->cpuInputArray[i].dbIdentifier;
161 // outputNode->whenMessageArrived = cpuThreadInput->cpuInputArray[i].whenMessageArrived;
162 outputNode->packet=packet;
163
164 packet->systemAddress.ToString(true,outputNode->ipAddressString);
165 RakNet::BitStream bitStream(packet->data, packet->length, false);
166 bitStream.IgnoreBytes(1);
167 bitStream.Read(outputNode->dbIdentifier);
168 bitStream.Read(outputNode->tableName);
169 outputNode->tableName.SQLEscape();
170 bitStream.Read(outputNode->line);
171 bitStream.Read(outputNode->file);
172 bitStream.Read(outputNode->tickCount);
173 bitStream.Read(outputNode->clientSendingTime);
174 bitStream.Read(outputNode->isFunctionCall);
175 bitStream.Read(outputNode->parameterCount);
176 if (outputNode->isFunctionCall==false)
177 {
178 RakNet::RakString columnName;
179 // printf("2. parameterCount=%i, ",outputNode->parameterCount);
180 for (int i=0; i < outputNode->parameterCount; i++)
181 {
182 bitStream.Read(columnName);
183 columnName.SQLEscape();
184 columnName.RemoveCharacter(' ');
185 outputNode->insertingColumnNames.Push(columnName, __FILE__, __LINE__ );
186 }
187 }
188
189 int parameterCountIndex=0;
190 // printf("3. parameterCount=%i, ",outputNode->parameterCount);
191 while (parameterCountIndex < outputNode->parameterCount)
192 {
193 outputNode->parameterList[parameterCountIndex].Deserialize(&bitStream);
194
195 if (outputNode->parameterList[parameterCountIndex].size>0)
196 {
197 parameterCountIndex++;
198 }
199 else
200 {
201 // Skip empty parameters
202 if (outputNode->isFunctionCall==false)
203 outputNode->insertingColumnNames.RemoveAtIndex(parameterCountIndex);
204 outputNode->parameterCount--;
205 }
206 }
207
208 // sqlite3_stmt *statement;
209 // char *errorMsg;
210 // printf("4. parameterCount=%i, ",outputNode->parameterCount);
211 for (parameterCountIndex=0; parameterCountIndex < outputNode->parameterCount; parameterCountIndex++)
212 {
213 if (outputNode->parameterList[parameterCountIndex].type==SQLLPDT_IMAGE)
214 {
215
216 bool dxtCompressSuccess=false;
217 if (dxtCompressionSupported)
218 {
219 char *outputData;
220 int bufferSize = DXTCompressor::GetBufferSize(DXT1,
221 outputNode->parameterList[parameterCountIndex].imageWidth,
222 outputNode->parameterList[parameterCountIndex].imageHeight);
223 int ddsHeaderSize = DXTCompressor::GetDDSHeaderSize();
224 outputData = (char*) rakMalloc_Ex(bufferSize + ddsHeaderSize, __FILE__, __LINE__ );
225 dxtCompressSuccess = DXTCompressor::CompressImageData(
226 DXT1,
227 outputNode->parameterList[parameterCountIndex].data.cptr,
228 outputNode->parameterList[parameterCountIndex].imageWidth,
229 outputNode->parameterList[parameterCountIndex].imageHeight,
230 outputData+ddsHeaderSize, false, outputNode->parameterList[parameterCountIndex].sourceFormatIsBGRA );
231
232 if (dxtCompressSuccess)
233 {
234 rakFree_Ex(outputNode->parameterList[parameterCountIndex].data.cptr,__FILE__,__LINE__);
235 DXTCompressor::WriteDDSHeader(DXT1,
236 outputNode->parameterList[parameterCountIndex].imageWidth,
237 outputNode->parameterList[parameterCountIndex].imageHeight,
238 bufferSize,
239 outputData);
240 outputNode->parameterList[parameterCountIndex].data.cptr=outputData;
241 outputNode->parameterList[parameterCountIndex].size=bufferSize + ddsHeaderSize;
242
243 // static bool testWriteToDisk=true;
244 // if (testWriteToDisk)
245 // {
246 // printf("Wrote test.dds\n");
247 // FILE *fp = fopen("test.dds", "wb");
248 // fwrite(outputData,1,outputNode->parameterList[parameterCountIndex].size,fp);
249 // fclose(fp);
250 // testWriteToDisk=false;
251 // }
252 }
253 else
254 {
255 rakFree_Ex(outputData,__FILE__,__LINE__);
256 }
257 }
258
259 if (dxtCompressSuccess==false)
260 {
261 if (outputNode->parameterList[parameterCountIndex].sourceFormatIsBGRA)
262 {
263 // Endian swap each color component. Input should be RGBA
264 // int pixelIndex;
265 // int rowIndex;
266 int imageHeight = outputNode->parameterList[parameterCountIndex].imageHeight;
267 // int imageWidth = outputNode->parameterList[parameterCountIndex].imageWidth;
268 int bytesPerPixel = outputNode->parameterList[parameterCountIndex].input_components;
269 int linePitch = outputNode->parameterList[parameterCountIndex].linePitch;
270 char *dataPtr = outputNode->parameterList[parameterCountIndex].data.cptr;
271 int totalBytes=linePitch*imageHeight;
272 char *endPtr = dataPtr+totalBytes;
273 unsigned char temp1;
274 if (bytesPerPixel==3)
275 {
276 while (dataPtr!=endPtr)
277 {
278 temp1=dataPtr[2];
279 dataPtr[2]=dataPtr[0];
280 dataPtr[0]=temp1;
281 dataPtr+=3;
282 }
283 }
284 else
285 {
286 RakAssert(bytesPerPixel==4);
287 while (dataPtr!=endPtr)
288 {
289 temp1=dataPtr[2];
290 dataPtr[2]=dataPtr[0];
291 dataPtr[0]=temp1;
292 dataPtr+=4;
293 }
294 }
295 }
296
297 CompressAsJpeg(
298 &outputNode->parameterList[parameterCountIndex].data.cptr,
299 &outputNode->parameterList[parameterCountIndex].size,
300 outputNode->parameterList[parameterCountIndex].imageWidth,
301 outputNode->parameterList[parameterCountIndex].imageHeight,
302 outputNode->parameterList[parameterCountIndex].linePitch,
303 outputNode->parameterList[parameterCountIndex].input_components
304 );
305 //
306 // static bool testWriteToDisk=true;
307 // if (testWriteToDisk)
308 // {
309 // printf("Wrote test.jpg\n");
310 // FILE *fp = fopen("test.jpg", "wb");
311 // fwrite(outputNode->parameterList[parameterCountIndex].data.cptr,1,outputNode->parameterList[parameterCountIndex].size,fp);
312 // fclose(fp);
313 // testWriteToDisk=false;
314 // }
315
316 }
317 }
318 }
319 }
320
321 // printf("5. out1, ");
322 RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
323 // printf("6. out2\n");
324 return cpuThreadOutput;
325 }
326 struct SQLPreparedStatements
327 {
SQLPreparedStatementsSQLPreparedStatements328 SQLPreparedStatements()
329 {
330 selectNameFromMaster=0;
331 insertIntoFunctionCalls=0;
332 insertIntoFunctionCallParameters=0;
333 }
334 sqlite3_stmt *selectNameFromMaster;
335 sqlite3_stmt *insertIntoFunctionCalls;
336 sqlite3_stmt *insertIntoFunctionCallParameters;
337 };
SQLLoggerThreadAllocPreparedStatements()338 void* SQLLoggerThreadAllocPreparedStatements()
339 {
340 SQLPreparedStatements *s = RakNet::OP_NEW<SQLPreparedStatements>(__FILE__,__LINE__);
341 return s;
342 }
SQLLoggerThreadDeallocPreparedStatements(void * statementStruct)343 void SQLLoggerThreadDeallocPreparedStatements(void* statementStruct)
344 {
345 SQLPreparedStatements *s = (SQLPreparedStatements *) statementStruct;
346 if (s->selectNameFromMaster) sqlite3_finalize(s->selectNameFromMaster);
347 if (s->insertIntoFunctionCalls) sqlite3_finalize(s->insertIntoFunctionCalls);
348 if (s->insertIntoFunctionCallParameters) sqlite3_finalize(s->insertIntoFunctionCallParameters);
349 RakNet::OP_DELETE(s,__FILE__,__LINE__);
350 }
DeleteBlobOrText(void * v)351 void DeleteBlobOrText(void* v)
352 {
353 LogParameter::Free(v);
354 }
ExecSQLLoggingThread(SQLiteServerLoggerPlugin::SQLThreadInput sqlThreadInput,bool * returnOutput,void * perThreadData)355 SQLiteServerLoggerPlugin::SQLThreadOutput ExecSQLLoggingThread(SQLiteServerLoggerPlugin::SQLThreadInput sqlThreadInput, bool *returnOutput, void* perThreadData)
356 {
357 SQLiteServerLoggerPlugin::CPUThreadOutputNode *cpuOutputNode = sqlThreadInput.cpuOutputNode;
358 SQLPreparedStatements *preparedStatements = (SQLPreparedStatements*) perThreadData;
359
360 *returnOutput=true;
361 SQLiteServerLoggerPlugin::SQLThreadOutput sqlThreadOutput;
362 sqlThreadOutput.cpuOutputNode=cpuOutputNode;
363 sqlite3 *dbHandle = sqlThreadInput.dbHandle;
364 // sqlite3_stmt *statement;
365 char *errorMsg;
366
367 sqlite3_exec(dbHandle,"BEGIN TRANSACTION", 0, 0, 0);
368
369 int rc;
370 if (cpuOutputNode->isFunctionCall)
371 {
372 if (preparedStatements->selectNameFromMaster==0)
373 {
374 // Create function tables if they are not there already
375 if (sqlite3_prepare_v2(
376 dbHandle,
377 "SELECT name FROM sqlite_master WHERE type='table' AND name="FUNCTION_CALL_TABLE" ",
378 -1,
379 &preparedStatements->selectNameFromMaster,
380 0
381 )!=SQLITE_OK)
382 {
383 RakAssert("Failed PRAGMA table_info for function tables in SQLiteServerLoggerPlugin.cpp" && 0);
384 for (int i=0; i < cpuOutputNode->parameterCount; i++)
385 cpuOutputNode->parameterList[i].Free();
386 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
387 return sqlThreadOutput;
388 }
389 }
390 rc = sqlite3_step(preparedStatements->selectNameFromMaster);
391 sqlite3_reset(preparedStatements->selectNameFromMaster);
392
393 if (rc!=SQLITE_ROW)
394 {
395 // Create table if it isn't there already
396 rc = sqlite3_exec(dbHandle,"CREATE TABLE "FUNCTION_CALL_TABLE" (functionId_pk INTEGER PRIMARY KEY, "FUNCTION_CALL_FRIENDLY_TEXT" TEXT, functionName TEXT, "FILE_COLUMN" TEXT, "LINE_COLUMN" INTEGER, "TICK_COUNT_COLUMN" INTEGER, "AUTO_IP_COLUMN" TEXT, "TIMESTAMP_TEXT_COLUMN" TIMESTAMP DATE DEFAULT (datetime('now','localtime')), "TIMESTAMP_NUMERIC_COLUMN" NUMERIC )", 0, 0, &errorMsg);
397 RakAssert(rc==SQLITE_OK);
398 sqlite3_free(errorMsg);
399 // See sqlDataTypeNames for *val
400 rc = sqlite3_exec(dbHandle,"CREATE TABLE "FUNCTION_CALL_PARAMETERS_TABLE" (fcpId_pk INTEGER PRIMARY KEY, functionId_fk integer NOT NULL, value TEXT, FOREIGN KEY (functionId_fk) REFERENCES "FUNCTION_CALL_TABLE" (functionId_pk))", 0, 0, &errorMsg);
401 RakAssert(rc==SQLITE_OK);
402 sqlite3_free(errorMsg);
403 }
404 else
405 {
406 // Table already there
407 }
408
409 // Insert into function calls
410 int parameterCountIndex;
411 RakNet::RakString functionCallFriendlyText("%s(", cpuOutputNode->tableName.C_String());
412 for (parameterCountIndex=0; parameterCountIndex < cpuOutputNode->parameterCount; parameterCountIndex++)
413 {
414 if (parameterCountIndex!=0)
415 functionCallFriendlyText+=", ";
416 switch (cpuOutputNode->parameterList[parameterCountIndex].type)
417 {
418 case SQLLPDT_POINTER:
419 if (cpuOutputNode->parameterList[parameterCountIndex].size==4)
420 functionCallFriendlyText+=RakNet::RakString("%p", cpuOutputNode->parameterList[parameterCountIndex].data.i);
421 else
422 functionCallFriendlyText+=RakNet::RakString("%p", cpuOutputNode->parameterList[parameterCountIndex].data.ll);
423 break;
424 case SQLLPDT_INTEGER:
425 switch (cpuOutputNode->parameterList[parameterCountIndex].size)
426 {
427 case 1:
428 functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.c);
429 break;
430 case 2:
431 functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.s);
432 break;
433 case 4:
434 functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.i);
435 break;
436 case 8:
437 functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.ll);
438 break;
439 }
440 break;
441 case SQLLPDT_REAL:
442 if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
443 functionCallFriendlyText+=RakNet::RakString("%f", cpuOutputNode->parameterList[parameterCountIndex].data.f);
444 else
445 functionCallFriendlyText+=RakNet::RakString("%d", cpuOutputNode->parameterList[parameterCountIndex].data.d);
446 break;
447 case SQLLPDT_TEXT:
448 functionCallFriendlyText+='"';
449 if (cpuOutputNode->parameterList[parameterCountIndex].size>0)
450 functionCallFriendlyText.AppendBytes(cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size);
451 functionCallFriendlyText+='"';
452 break;
453 case SQLLPDT_IMAGE:
454 functionCallFriendlyText+=RakNet::RakString("<%i byte image>", cpuOutputNode->parameterList[parameterCountIndex].size, cpuOutputNode->parameterList[parameterCountIndex].data.cptr);
455 break;
456 case SQLLPDT_BLOB:
457 functionCallFriendlyText+=RakNet::RakString("<%i byte binary>", cpuOutputNode->parameterList[parameterCountIndex].size, cpuOutputNode->parameterList[parameterCountIndex].data.cptr);
458 break;
459 }
460 }
461
462 functionCallFriendlyText+=");";
463
464 if (preparedStatements->insertIntoFunctionCalls==0)
465 {
466 rc = sqlite3_prepare_v2(dbHandle, "INSERT INTO "FUNCTION_CALL_TABLE" ("FUNCTION_CALL_FRIENDLY_TEXT", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN", "AUTO_IP_COLUMN", "TIMESTAMP_NUMERIC_COLUMN" ,functionName) VALUES (?,?,?,?,?,?,?)", -1, &preparedStatements->insertIntoFunctionCalls, 0);
467 if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
468 {
469 RakAssert("Failed INSERT INTO "FUNCTION_CALL_PARAMETERS_TABLE" in SQLiteServerLoggerPlugin.cpp" && 0);
470 }
471 }
472 sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 1, functionCallFriendlyText.C_String(), -1, SQLITE_TRANSIENT);
473 sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 2, cpuOutputNode->file.C_String(), -1, SQLITE_TRANSIENT);
474 sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 3, cpuOutputNode->line);
475 sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 4, cpuOutputNode->tickCount);
476 sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 5, cpuOutputNode->ipAddressString, -1, SQLITE_TRANSIENT);
477 sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 6, (uint32_t) (cpuOutputNode->clientSendingTime));
478 sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 7, cpuOutputNode->tableName.C_String(), -1, SQLITE_TRANSIENT);
479 rc = sqlite3_step(preparedStatements->insertIntoFunctionCalls);
480 sqlite3_reset(preparedStatements->insertIntoFunctionCalls);
481 if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
482 {
483 RakAssert("Failed binding parameters to functionCalls in SQLiteServerLoggerPlugin.cpp" && 0);
484 }
485 // sqlite3_finalize(statement);
486
487 // RakNet::RakString insertIntoFunctionCallsQuery("INSERT INTO 'functionCalls' ("FUNCTION_CALL_FRIENDLY_TEXT", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN",functionName) VALUES ('%s','%s',%i,%i,'%s') ", functionCallFriendlyText.C_String(), file.C_String(), line, tickCount,tableName.C_String());
488 // rc = sqlite3_exec(dbHandle,insertIntoFunctionCallsQuery.C_String(), 0, 0, &errorMsg);
489 // RakAssert(rc==SQLITE_OK);
490 // sqlite3_free(errorMsg);
491 // Read last row id
492 // Requires that this thread has its own connection
493 sqlite3_int64 lastRowId = sqlite3_last_insert_rowid(dbHandle);
494
495 if (preparedStatements->insertIntoFunctionCallParameters==0)
496 {
497 rc = sqlite3_prepare_v2(dbHandle, "INSERT INTO functionCallParameters (functionId_fk, value) VALUES (?,?);", -1, &preparedStatements->insertIntoFunctionCallParameters, 0);
498 RakAssert(rc==SQLITE_OK);
499 sqlite3_bind_int64(preparedStatements->insertIntoFunctionCallParameters, 1, lastRowId);
500 }
501
502 // Insert into parameters table
503 for (parameterCountIndex=0; parameterCountIndex < cpuOutputNode->parameterCount; parameterCountIndex++)
504 {
505 switch (cpuOutputNode->parameterList[parameterCountIndex].type)
506 {
507 case SQLLPDT_POINTER:
508 case SQLLPDT_INTEGER:
509 switch (cpuOutputNode->parameterList[parameterCountIndex].size)
510 {
511 case 1:
512 sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.c);
513 break;
514 case 2:
515 sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.s);
516 break;
517 case 4:
518 sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.i);
519 break;
520 case 8:
521 sqlite3_bind_int64(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.ll);
522 break;
523 }
524 break;
525 case SQLLPDT_REAL:
526 if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
527 sqlite3_bind_double(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.f);
528 else
529 sqlite3_bind_double(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.d);
530 break;
531 case SQLLPDT_TEXT:
532 sqlite3_bind_text(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size, 0);
533 break;
534 case SQLLPDT_IMAGE:
535 case SQLLPDT_BLOB:
536 sqlite3_bind_blob(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.vptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
537 cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
538 break;
539 default:
540 RakAssert("Hit invalid default in case label in SQLiteServerLoggerPlugin.cpp" && 0);
541 }
542 rc = sqlite3_step(preparedStatements->insertIntoFunctionCallParameters);
543 sqlite3_reset(preparedStatements->insertIntoFunctionCallParameters);
544 if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
545 {
546 RakAssert("Failed sqlite3_step to bind functionCall parameters in SQLiteServerLoggerPlugin.cpp" && 0);
547 }
548 }
549
550 // if (statement)
551 // sqlite3_finalize(statement);
552 }
553 else
554 {
555
556 sqlite3_stmt *pragmaTableInfo;
557 RakNet::RakString pragmaQuery("PRAGMA table_info(%s)",cpuOutputNode->tableName.C_String());
558 if (sqlite3_prepare_v2(
559 dbHandle,
560 pragmaQuery.C_String(),
561 -1,
562 &pragmaTableInfo,
563 0
564 )!=SQLITE_OK)
565 {
566 RakAssert("Failed PRAGMA table_info for tableName in SQLiteServerLoggerPlugin.cpp" && 0);
567 for (int i=0; i < cpuOutputNode->parameterCount; i++)
568 cpuOutputNode->parameterList[i].Free();
569 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
570 return sqlThreadOutput;
571 }
572
573 int rc = sqlite3_step(pragmaTableInfo);
574 DataStructures::List<RakNet::RakString> existingColumnNames;
575 DataStructures::List<RakNet::RakString> existingColumnTypes;
576 char *errorMsg;
577 while (rc==SQLITE_ROW)
578 {
579 /*
580 int nameColumn;
581 for (int j=0; j < sqlite3_column_count(statement); j++)
582 {
583 if (strcmp(sqlite3_column_name(statement,j),"name")==0)
584 {
585 nameColumn=j;
586 break;
587 }
588 }
589 int typeColumn;
590 for (int j=0; j < sqlite3_column_count(statement); j++)
591 {
592 if (strcmp(sqlite3_column_name(statement,j),"type")==0)
593 {
594 typeColumn=j;
595 break;
596 }
597 }
598 */
599 const int nameColumn=1;
600 const int typeColumn=2;
601 RakAssert(strcmp(sqlite3_column_name(pragmaTableInfo,nameColumn),"name")==0);
602 RakAssert(strcmp(sqlite3_column_name(pragmaTableInfo,typeColumn),"type")==0);
603 RakNet::RakString columnName = sqlite3_column_text(pragmaTableInfo,nameColumn);
604 RakNet::RakString columnType = sqlite3_column_text(pragmaTableInfo,typeColumn);
605 existingColumnNames.Push(columnName, __FILE__, __LINE__ );
606 existingColumnTypes.Push(columnType, __FILE__, __LINE__ );
607
608 rc = sqlite3_step(pragmaTableInfo);
609 }
610 sqlite3_reset(pragmaTableInfo);
611 sqlite3_finalize(pragmaTableInfo);
612 if (rc==SQLITE_ERROR)
613 {
614 RakAssert("Failed sqlite3_step in SQLiteServerLoggerPlugin.cpp" && 0);
615 for (int i=0; i < cpuOutputNode->parameterCount; i++)
616 cpuOutputNode->parameterList[i].Free();
617 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
618 return sqlThreadOutput;
619 }
620
621 int existingColumnNamesIndex,insertingColumnNamesIndex;
622 if (existingColumnNames.Size()==0)
623 {
624 RakNet::RakString createQuery("CREATE TABLE %s (rowId_pk INTEGER PRIMARY KEY, "FILE_COLUMN" TEXT, "LINE_COLUMN" INTEGER, "TICK_COUNT_COLUMN" INTEGER, "AUTO_IP_COLUMN" TEXT, "TIMESTAMP_TEXT_COLUMN" TIMESTAMP DATE DEFAULT (datetime('now','localtime')), "TIMESTAMP_NUMERIC_COLUMN" NUMERIC",cpuOutputNode->tableName.C_String());
625
626 for (int i=0; i < cpuOutputNode->parameterCount; i++)
627 {
628 createQuery+=", ";
629 createQuery+=cpuOutputNode->insertingColumnNames[i];
630 createQuery+=" ";
631 createQuery+=GetSqlDataTypeName2(cpuOutputNode->parameterList[i].type);
632 }
633 createQuery+=" )";
634
635 sqlite3_exec(dbHandle,
636 createQuery.C_String(),
637 0, 0, &errorMsg);
638 RakAssert(errorMsg==0);
639 sqlite3_free(errorMsg);
640 }
641 else
642 {
643 // Compare what is there (columnNames,columnTypes) to what we are adding. Add what is missing
644 bool alreadyExists;
645 for (insertingColumnNamesIndex=0; insertingColumnNamesIndex<(int) cpuOutputNode->insertingColumnNames.Size(); insertingColumnNamesIndex++)
646 {
647 alreadyExists=false;
648 for (existingColumnNamesIndex=0; existingColumnNamesIndex<(int) existingColumnNames.Size(); existingColumnNamesIndex++)
649 {
650 if (existingColumnNames[existingColumnNamesIndex]==cpuOutputNode->insertingColumnNames[insertingColumnNamesIndex])
651 {
652 // Type mismatch? If so, abort
653 if (existingColumnTypes[existingColumnNamesIndex]!=GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type))
654 {
655 printf("Error: Column type mismatch. TableName=%s. ColumnName%s. Existing=%s. New=%s\n",
656 cpuOutputNode->tableName.C_String(),
657 existingColumnNames[existingColumnNamesIndex].C_String(),
658 existingColumnTypes[existingColumnNamesIndex].C_String(),
659 GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type)
660 );
661 for (int i=0; i < cpuOutputNode->parameterCount; i++)
662 cpuOutputNode->parameterList[i].Free();
663 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
664 return sqlThreadOutput;
665 }
666
667 alreadyExists=true;
668 break;
669 }
670 }
671
672 if (alreadyExists==false)
673 {
674 sqlite3_exec(dbHandle,
675 RakNet::RakString("ALTER TABLE %s ADD %s %s",
676 cpuOutputNode->tableName.C_String(),
677 cpuOutputNode->insertingColumnNames[insertingColumnNamesIndex].C_String(),
678 GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type)
679 ).C_String(),
680 0, 0, &errorMsg);
681 RakAssert(errorMsg==0);
682 sqlite3_free(errorMsg);
683 }
684 }
685 }
686
687
688
689 // Insert new row
690 RakNet::RakString insertQuery("INSERT INTO %s (", cpuOutputNode->tableName.C_String());
691 int parameterCountIndex;
692 for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount; parameterCountIndex++)
693 {
694 if (parameterCountIndex!=0)
695 insertQuery+=", ";
696 insertQuery+=cpuOutputNode->insertingColumnNames[parameterCountIndex].C_String();
697 }
698 // Add file and line to the end
699 insertQuery+=", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN", "AUTO_IP_COLUMN", "TIMESTAMP_NUMERIC_COLUMN" ) VALUES (";
700
701 for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount+5; parameterCountIndex++)
702 {
703 if (parameterCountIndex!=0)
704 insertQuery+=", ?";
705 else
706 insertQuery+="?";
707 }
708 insertQuery+=")";
709
710 sqlite3_stmt *customStatement;
711 if (sqlite3_prepare_v2(
712 dbHandle,
713 insertQuery.C_String(),
714 -1,
715 &customStatement,
716 0
717 )!=SQLITE_OK)
718 {
719 RakAssert("Failed second sqlite3_prepare_v2 in SQLiteServerLoggerPlugin.cpp" && 0);
720 for (int i=0; i < cpuOutputNode->parameterCount; i++)
721 cpuOutputNode->parameterList[i].Free();
722 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
723 return sqlThreadOutput;
724 }
725
726 for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount; parameterCountIndex++)
727 {
728 switch (cpuOutputNode->parameterList[parameterCountIndex].type)
729 {
730 case SQLLPDT_POINTER:
731 case SQLLPDT_INTEGER:
732 switch (cpuOutputNode->parameterList[parameterCountIndex].size)
733 {
734 case 1:
735 sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.c);
736 break;
737 case 2:
738 sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.s);
739 break;
740 case 4:
741 sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.i);
742 break;
743 case 8:
744 sqlite3_bind_int64(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.ll);
745 break;
746 }
747 break;
748 case SQLLPDT_REAL:
749 if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
750 sqlite3_bind_double(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.f);
751 else
752 sqlite3_bind_double(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.d);
753 break;
754 case SQLLPDT_TEXT:
755 sqlite3_bind_text(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
756 cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
757 break;
758 case SQLLPDT_IMAGE:
759 case SQLLPDT_BLOB:
760 sqlite3_bind_blob(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.vptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
761 cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
762 break;
763 }
764 }
765
766 // Add file and line to the end
767 sqlite3_bind_text(customStatement, parameterCountIndex+1, cpuOutputNode->file.C_String(), (int) cpuOutputNode->file.GetLength(), SQLITE_TRANSIENT);
768 sqlite3_bind_int(customStatement, parameterCountIndex+2, cpuOutputNode->line);
769 sqlite3_bind_int(customStatement, parameterCountIndex+3, cpuOutputNode->tickCount);
770 sqlite3_bind_text(customStatement, parameterCountIndex+4, cpuOutputNode->ipAddressString, -1, SQLITE_TRANSIENT);
771 sqlite3_bind_int(customStatement, parameterCountIndex+5, (uint32_t) (cpuOutputNode->clientSendingTime));
772
773 rc = sqlite3_step(customStatement);
774 if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
775 {
776 RakAssert("Failed sqlite3_step to bind blobs in SQLiteServerLoggerPlugin.cpp" && 0);
777 for (int i=0; i < cpuOutputNode->parameterCount; i++)
778 cpuOutputNode->parameterList[i].Free();
779 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
780 return sqlThreadOutput;
781 }
782 sqlite3_finalize(customStatement);
783 }
784 sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
785
786 for (int i=0; i < cpuOutputNode->parameterCount; i++)
787 cpuOutputNode->parameterList[i].Free();
788
789 return sqlThreadOutput;
790 }
791
SQLiteServerLoggerPlugin()792 SQLiteServerLoggerPlugin::SQLiteServerLoggerPlugin()
793 {
794 sessionManagementMode=CREATE_EACH_NAMED_DB_HANDLE;
795 createDirectoryForFile=true;
796 cpuThreadInput=0;
797 dxtCompressionEnabled=false;
798 }
799
~SQLiteServerLoggerPlugin()800 SQLiteServerLoggerPlugin::~SQLiteServerLoggerPlugin()
801 {
802 StopCPUSQLThreads();
803 RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
804 CloseAllSessions();
805 }
Update(void)806 void SQLiteServerLoggerPlugin::Update(void)
807 {
808 SQLite3ServerPlugin::Update();
809
810 bool hadOutput=false;
811
812 int arrayIndex;
813 // unsigned int i;
814
815 PushCpuThreadInputIfNecessary();
816
817 while (cpuLoggerThreadPool.HasOutputFast() && cpuLoggerThreadPool.HasOutput())
818 {
819 CPUThreadOutput* cpuThreadOutput=cpuLoggerThreadPool.GetOutput();
820 for (arrayIndex=0; arrayIndex < cpuThreadOutput->arraySize; arrayIndex++)
821 {
822 SQLThreadInput sqlThreadInput;
823 CPUThreadOutputNode *outputNode = cpuThreadOutput->cpuOutputNodeArray[arrayIndex];
824 sqlThreadInput.cpuOutputNode=outputNode;
825 // bool alreadyHasLoggedInSession=false;
826 int sessionIndex;
827 for (sessionIndex=0; sessionIndex < loggedInSessions.Size(); sessionIndex++)
828 {
829 if (loggedInSessions[sessionIndex].systemAddress==outputNode->packet->systemAddress &&
830 loggedInSessions[sessionIndex].sessionName==outputNode->dbIdentifier)
831 {
832 break;
833 }
834 }
835
836 DataStructures::DefaultIndexType idx;
837 if (sessionManagementMode==USE_ANY_DB_HANDLE)
838 {
839 if (dbHandles.GetSize()>0)
840 idx=0;
841 else
842 idx=-1;
843 }
844 else
845 {
846 idx = dbHandles.GetIndexOf(outputNode->dbIdentifier);
847 if (sessionIndex==loggedInSessions.Size() && (createDirectoryForFile==true || idx==-1) && sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
848 {
849 // Create it, and set idx to the new one
850 idx = CreateDBHandle(outputNode->dbIdentifier);
851 }
852 else if (idx==-1)
853 {
854 if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE || sessionManagementMode==CREATE_SHARED_NAMED_DB_HANDLE)
855 {
856 // Create it, and set idx to the new one
857 idx = CreateDBHandle(outputNode->dbIdentifier);
858 }
859 if (sessionManagementMode==USE_NAMED_DB_HANDLE)
860 {
861 RakAssert("Can't find named DB handle\n" && 0);
862 }
863 }
864 else
865 {
866 // Use idx
867 }
868 }
869
870 if (idx==-1)
871 {
872 DeallocPacketUnified(outputNode->packet);
873 RakNet::OP_DELETE(outputNode,__FILE__,__LINE__);
874 }
875 else
876 {
877 if (sessionIndex==loggedInSessions.Size())
878 {
879 SessionNameAndSystemAddress sassy;
880 sassy.sessionName=outputNode->dbIdentifier;
881 sassy.systemAddress=outputNode->packet->systemAddress;
882 sassy.referencedPointer=dbHandles[idx].dbHandle;
883 RakNetTimeMS curTime = RakNet::GetTimeMS();
884 RakNetTimeMS dbAge = curTime - dbHandles[idx].whenCreated;
885 // RakNetTimeMS timeDelta = outputNode->clientSendingTime - curTime;
886 sassy.timestampDelta=dbAge - outputNode->clientSendingTime ;
887 // sassy.dbAgeWhenCreated=dbHandles[idx].whenCreated;
888 loggedInSessions.Push(sassy, __FILE__, __LINE__ );
889 sessionIndex=loggedInSessions.Size()-1;
890 }
891
892 DeallocPacketUnified(outputNode->packet);
893 sqlThreadInput.dbHandle=dbHandles[idx].dbHandle;
894 outputNode->clientSendingTime+=loggedInSessions[sessionIndex].timestampDelta;
895 sqlLoggerThreadPool.AddInput(ExecSQLLoggingThread, sqlThreadInput);
896 }
897 }
898
899 // RakNet::OP_DELETE_ARRAY(cpuThreadOutput->cpuOutputNodeArray);
900 RakNet::OP_DELETE(cpuThreadOutput,__FILE__,__LINE__);
901 }
902
903 while (sqlLoggerThreadPool.HasOutputFast() && sqlLoggerThreadPool.HasOutput())
904 {
905 hadOutput=true;
906 RakNet::OP_DELETE(sqlLoggerThreadPool.GetOutput().cpuOutputNode,__FILE__,__LINE__);
907 }
908
909 if (hadOutput)
910 CloseUnreferencedSessions();
911 }
OnReceive(Packet * packet)912 PluginReceiveResult SQLiteServerLoggerPlugin::OnReceive(Packet *packet)
913 {
914 PluginReceiveResult prr = SQLite3ServerPlugin::OnReceive(packet);
915 if (prr!=RR_CONTINUE_PROCESSING)
916 return prr;
917
918 switch (packet->data[0])
919 {
920 case ID_SQLLITE_LOGGER:
921 {
922 RakNet::BitStream bitStream(packet->data, packet->length, false);
923 bitStream.IgnoreBytes(1);
924 RakNet::RakString dbIdentifier;
925 bitStream.Read(dbIdentifier);
926
927 if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
928 {
929 unsigned char senderAddr[32];
930 packet->systemAddress.ToString(true,(char*) senderAddr);
931 dbIdentifier+=':';
932 dbIdentifier+=senderAddr;
933 }
934
935 CPUThreadInput *ti = LockCpuThreadInput();
936 ti->cpuInputArray[ti->arraySize].packet=packet;
937 // ti->cpuInputArray[ti->arraySize].whenMessageArrived=RakNet::GetTimeMS();
938 ti->cpuInputArray[ti->arraySize].dbIdentifier=dbIdentifier;
939 UnlockCpuThreadInput();
940
941
942 /*
943 unsigned int i;
944 bool alreadyHasLoggedInSession=false;
945 for (i=0; i < loggedInSessions.Size(); i++)
946 {
947 if (loggedInSessions[i].systemAddress==packet->systemAddress &&
948 loggedInSessions[i].sessionName==dbIdentifier)
949 {
950 alreadyHasLoggedInSession=true;
951 break;
952 }
953 }
954
955 DataStructures::DefaultIndexType idx;
956 if (sessionManagementMode==USE_ANY_DB_HANDLE)
957 {
958 if (dbHandles.GetSize()>0)
959 idx=0;
960 else
961 idx=-1;
962 }
963 else
964 {
965 idx = dbHandles.GetIndexOf(dbIdentifier);
966 if (alreadyHasLoggedInSession==false && (createDirectoryForFile==true || idx==-1) && sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
967 {
968 // Create it, and set idx to the new one
969 idx = CreateDBHandle(dbIdentifier);
970 }
971 else if (idx==-1)
972 {
973 if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE || sessionManagementMode==CREATE_SHARED_NAMED_DB_HANDLE)
974 {
975 // Create it, and set idx to the new one
976 idx = CreateDBHandle(dbIdentifier);
977 }
978 if (sessionManagementMode==USE_NAMED_DB_HANDLE)
979 {
980 RakAssert("Can't find named DB handle\n" && 0);
981 }
982 }
983 else
984 {
985 // Use idx
986 }
987 }
988
989 if (idx==-1)
990 {
991 return RR_STOP_PROCESSING_AND_DEALLOCATE;
992 }
993
994 if (alreadyHasLoggedInSession==false)
995 {
996 SessionNameAndSystemAddress sassy;
997 sassy.sessionName=dbIdentifier;
998 sassy.systemAddress=packet->systemAddress;
999 sassy.referencedPointer=dbHandles[idx].dbHandle;
1000 loggedInSessions.Push(sassy);
1001 }
1002
1003 SQLExecThreadInput input;
1004 input.dbHandle=dbHandles[idx].dbHandle;
1005 input.packet=packet;
1006 input.whenMessageArrived=RakNet::GetTimeMS()-dbHandles[idx].whenCreated;
1007 __sqlThreadPool.AddInput(ExecSQLLoggingThread, input);
1008 // printf("Pending Queries: %i\n", __sqlThreadPool.InputSize());
1009 */
1010 return RR_STOP_PROCESSING;
1011 }
1012 }
1013 return RR_CONTINUE_PROCESSING;
1014 }
OnClosedConnection(SystemAddress systemAddress,RakNetGUID rakNetGUID,PI2_LostConnectionReason lostConnectionReason)1015 void SQLiteServerLoggerPlugin::OnClosedConnection(SystemAddress systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
1016 {
1017 RakNet::RakString removedSession;
1018 unsigned int i=0;
1019 while (i < loggedInSessions.Size())
1020 {
1021 if (loggedInSessions[i].systemAddress==systemAddress)
1022 {
1023 removedSession=loggedInSessions[i].sessionName;
1024 loggedInSessions.RemoveAtIndexFast(i);
1025
1026 /*
1027 unsigned int j;
1028 bool removedSessionReferenced=false;
1029 for (j=0; j < loggedInSessions.Size(); j++)
1030 {
1031 if (loggedInSessions[j].sessionName==removedSession)
1032 {
1033 removedSessionReferenced=true;
1034 break;
1035 }
1036 }
1037 if (removedSessionReferenced==false)
1038 {
1039 // if (__sqlThreadPool.InputSize()>0 || __sqlThreadPool.IsWorking())
1040 // return;
1041
1042 // RemoveDBHandle(removedSession, sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE||sessionManagementMode==CREATE_ONE_NAMED_DB_HANDLE);
1043 }
1044 */
1045 }
1046 else
1047 i++;
1048 }
1049
1050 CloseUnreferencedSessions();
1051 }
CloseUnreferencedSessions(void)1052 void SQLiteServerLoggerPlugin::CloseUnreferencedSessions(void)
1053 {
1054 DataStructures::List<sqlite3 *> sessionNames;
1055 unsigned int j;
1056 for (j=0; j < loggedInSessions.Size(); j++)
1057 {
1058 if (sessionNames.GetIndexOf(loggedInSessions[j].referencedPointer)==-1)
1059 sessionNames.Push(loggedInSessions[j].referencedPointer, __FILE__, __LINE__ );
1060 }
1061
1062 DataStructures::List<sqlite3*> unreferencedHandles;
1063 bool isReferenced;
1064 for (unsigned int i=0; i < dbHandles.GetSize(); i++)
1065 {
1066 if (dbHandles[i].dbAutoCreated)
1067 {
1068 j=0;
1069 isReferenced=false;
1070 for (j=0; j < sessionNames.Size(); j++)
1071 {
1072 if (sessionNames[j]==dbHandles[i].dbHandle)
1073 {
1074 isReferenced=true;
1075 break;
1076 }
1077 }
1078
1079 if (isReferenced==false)
1080 {
1081 unreferencedHandles.Push(dbHandles[i].dbHandle,__FILE__,__LINE__);
1082 }
1083 }
1084 }
1085
1086 if (unreferencedHandles.Size())
1087 {
1088 sqlLoggerThreadPool.LockInput();
1089 if (sqlLoggerThreadPool.HasInputFast()==false)
1090 {
1091 RakSleep(100);
1092 while (sqlLoggerThreadPool.NumThreadsWorking()>0)
1093 RakSleep(30);
1094 for (unsigned int k=0; k < unreferencedHandles.Size(); k++)
1095 {
1096 RemoveDBHandle(unreferencedHandles[k], true);
1097 }
1098 }
1099 sqlLoggerThreadPool.UnlockInput();
1100
1101 if (dbHandles.GetSize()==0)
1102 StopCPUSQLThreads();
1103 }
1104 }
CloseAllSessions(void)1105 void SQLiteServerLoggerPlugin::CloseAllSessions(void)
1106 {
1107 loggedInSessions.Clear(false, __FILE__, __LINE__);
1108 CloseUnreferencedSessions();
1109 }
CreateDBHandle(RakNet::RakString dbIdentifier)1110 unsigned int SQLiteServerLoggerPlugin::CreateDBHandle(RakNet::RakString dbIdentifier)
1111 {
1112 if (sessionManagementMode!=CREATE_EACH_NAMED_DB_HANDLE && sessionManagementMode!=CREATE_SHARED_NAMED_DB_HANDLE)
1113 return dbHandles.GetIndexOf(dbIdentifier);
1114
1115 RakNet::RakString filePath = newDatabaseFilePath;
1116 if (createDirectoryForFile)
1117 {
1118 filePath+=dbIdentifier;
1119 filePath.TerminateAtLastCharacter('.');
1120 filePath.MakeFilePath();
1121
1122 time_t now;
1123 struct tm *ts;
1124 char buf[80];
1125
1126 /* Get the current time */
1127 now = time(NULL);
1128
1129 /* Format and print the time, "ddd yyyy-mm-dd hh:mm:ss zzz" */
1130 ts = localtime(&now);
1131 strftime(buf, sizeof(buf), "__%a_%Y-%m-%d__%H;%M", ts);
1132
1133 filePath+=buf;
1134 filePath+=RakNet::RakString("__%i", RakNet::GetTimeMS());
1135
1136 filePath.MakeFilePath();
1137 }
1138
1139
1140 // With no file data, just creates the directory structure
1141 WriteFileWithDirectories(filePath.C_String(), 0, 0);
1142
1143 RakNet::RakString fileSafeDbIdentifier = dbIdentifier;
1144 fileSafeDbIdentifier.TerminateAtLastCharacter(':');
1145 RakNet::RakString fileNameWithPath=filePath+fileSafeDbIdentifier;
1146
1147 // SQL Open this file, and register it
1148 sqlite3 *database;
1149 if (sqlite3_open_v2(fileNameWithPath.C_String(), &database, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 0)!=SQLITE_OK)
1150 {
1151 RakAssert("sqlite3_open_v2 failed in SQLiteServerLoggerPlugin.cpp" && 0);
1152 return -1;
1153 }
1154 if (AddDBHandle(dbIdentifier, database, true))
1155 {
1156
1157 char *errorMsg;
1158 int rc = sqlite3_exec(database,"PRAGMA synchronous=OFF", 0, 0, &errorMsg);
1159 RakAssert(rc==SQLITE_OK);
1160 sqlite3_free(errorMsg);
1161 rc = sqlite3_exec(database,"PRAGMA count_changes=OFF", 0, 0, &errorMsg);
1162 RakAssert(rc==SQLITE_OK);
1163 sqlite3_free(errorMsg);
1164
1165 printf("Created %s\n", fileNameWithPath.C_String());
1166 return dbHandles.GetIndexOf(dbIdentifier);
1167 }
1168 else
1169 {
1170 RakAssert("Failed to call AddDbHandle" && 0);
1171 return -1;
1172 }
1173 return -1;
1174 }
SetSessionManagementMode(SessionManagementMode _sessionManagementMode,bool _createDirectoryForFile,const char * _newDatabaseFilePath)1175 void SQLiteServerLoggerPlugin::SetSessionManagementMode(SessionManagementMode _sessionManagementMode, bool _createDirectoryForFile, const char *_newDatabaseFilePath)
1176 {
1177 sessionManagementMode=_sessionManagementMode;
1178 createDirectoryForFile=_createDirectoryForFile;
1179 newDatabaseFilePath=_newDatabaseFilePath;
1180 newDatabaseFilePath.MakeFilePath();
1181 }
OnShutdown(void)1182 void SQLiteServerLoggerPlugin::OnShutdown(void)
1183 {
1184 CloseAllSessions();
1185 }
StopCPUSQLThreads(void)1186 void SQLiteServerLoggerPlugin::StopCPUSQLThreads(void)
1187 {
1188 unsigned int i;
1189 int j,k;
1190 cpuLoggerThreadPool.StopThreads();
1191 ClearCpuThreadInput();
1192 for (i=0; i < cpuLoggerThreadPool.InputSize(); i++)
1193 {
1194 CPUThreadInput *cpuThreadInput = cpuLoggerThreadPool.GetInputAtIndex(i);
1195 for (j=0; j < cpuThreadInput->arraySize; j++)
1196 {
1197 DeallocPacketUnified(cpuThreadInput->cpuInputArray[j].packet);
1198 }
1199 RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1200 }
1201 cpuLoggerThreadPool.ClearInput();
1202 for (i=0; i < cpuLoggerThreadPool.OutputSize(); i++)
1203 {
1204 CPUThreadOutput *cpuThreadOutput = cpuLoggerThreadPool.GetOutputAtIndex(i);
1205 for (j=0; j < cpuThreadOutput->arraySize; j++)
1206 {
1207 CPUThreadOutputNode *cpuThreadOutputNode = cpuThreadOutput->cpuOutputNodeArray[j];
1208 DeallocPacketUnified(cpuThreadOutputNode->packet);
1209 for (k=0; k < cpuThreadOutputNode->parameterCount; k++)
1210 cpuThreadOutputNode->parameterList[k].Free();
1211 RakNet::OP_DELETE(cpuThreadOutputNode,__FILE__,__LINE__);
1212 }
1213 // RakNet::OP_DELETE_ARRAY(cpuThreadOutput->cpuOutputNodeArray,__FILE__,__LINE__);
1214 RakNet::OP_DELETE(cpuThreadOutput,__FILE__,__LINE__);
1215 }
1216 cpuLoggerThreadPool.ClearOutput();
1217
1218 sqlLoggerThreadPool.StopThreads();
1219 for (i=0; i < sqlLoggerThreadPool.InputSize(); i++)
1220 RakNet::OP_DELETE(sqlLoggerThreadPool.GetInputAtIndex(i).cpuOutputNode,__FILE__,__LINE__);
1221 sqlLoggerThreadPool.ClearInput();
1222 for (i=0; i < sqlLoggerThreadPool.OutputSize(); i++)
1223 RakNet::OP_DELETE(sqlLoggerThreadPool.GetOutputAtIndex(i).cpuOutputNode,__FILE__,__LINE__);
1224 sqlLoggerThreadPool.ClearOutput();
1225 }
GetProcessingStatus(ProcessingStatus * processingStatus)1226 void SQLiteServerLoggerPlugin::GetProcessingStatus(ProcessingStatus *processingStatus)
1227 {
1228 if (cpuThreadInput)
1229 processingStatus->packetsBuffered=cpuThreadInput->arraySize;
1230 else
1231 processingStatus->packetsBuffered=0;
1232 processingStatus->cpuPendingProcessing=cpuLoggerThreadPool.InputSize();
1233 processingStatus->cpuProcessedAwaitingDeallocation=cpuLoggerThreadPool.OutputSize();
1234 processingStatus->cpuNumThreadsWorking=cpuLoggerThreadPool.NumThreadsWorking();
1235 processingStatus->sqlPendingProcessing=sqlLoggerThreadPool.InputSize();
1236 processingStatus->sqlProcessedAwaitingDeallocation=sqlLoggerThreadPool.OutputSize();
1237 processingStatus->sqlNumThreadsWorking=sqlLoggerThreadPool.NumThreadsWorking();
1238 }
1239
LockCpuThreadInput(void)1240 SQLiteServerLoggerPlugin::CPUThreadInput *SQLiteServerLoggerPlugin::LockCpuThreadInput(void)
1241 {
1242 if (cpuThreadInput==0)
1243 {
1244 cpuThreadInput=RakNet::OP_NEW<CPUThreadInput>(__FILE__,__LINE__);
1245 cpuThreadInput->arraySize=0;
1246 whenCpuThreadInputAllocated=RakNet::GetTime();
1247 }
1248 return cpuThreadInput;
1249 }
ClearCpuThreadInput(void)1250 void SQLiteServerLoggerPlugin::ClearCpuThreadInput(void)
1251 {
1252 if (cpuThreadInput!=0)
1253 {
1254 for (int i=0; i < cpuThreadInput->arraySize; i++)
1255 DeallocPacketUnified(cpuThreadInput->cpuInputArray[i].packet);
1256 RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1257 cpuThreadInput=0;
1258 }
1259 }
UnlockCpuThreadInput(void)1260 void SQLiteServerLoggerPlugin::UnlockCpuThreadInput(void)
1261 {
1262 cpuThreadInput->arraySize++;
1263 if (cpuThreadInput->arraySize==MAX_PACKETS_PER_CPU_INPUT_THREAD)
1264 PushCpuThreadInput();
1265 else
1266 PushCpuThreadInputIfNecessary();
1267 }
1268
CancelLockCpuThreadInput(void)1269 void SQLiteServerLoggerPlugin::CancelLockCpuThreadInput(void)
1270 {
1271 if (cpuThreadInput->arraySize==0)
1272 {
1273 RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1274 cpuThreadInput=0;
1275 }
1276 }
PushCpuThreadInput(void)1277 void SQLiteServerLoggerPlugin::PushCpuThreadInput(void)
1278 {
1279 // cpu threads can probably be as many as I want
1280 if (cpuLoggerThreadPool.WasStarted()==false)
1281 {
1282 if (dxtCompressionEnabled)
1283 cpuLoggerThreadPool.StartThreads(1,0,InitDxt, DeinitDxt);
1284 else
1285 cpuLoggerThreadPool.StartThreads(1,0,0, 0);
1286 }
1287 // sql logger threads should probably be limited to 1 since I'm doing transaction locks and calling sqlite3_last_insert_rowid
1288 if (sqlLoggerThreadPool.WasStarted()==false)
1289 sqlLoggerThreadPool.StartThreads(1,0, SQLLoggerThreadAllocPreparedStatements, SQLLoggerThreadDeallocPreparedStatements);
1290
1291 cpuLoggerThreadPool.AddInput(ExecCPULoggingThread, cpuThreadInput);
1292 cpuThreadInput=0;
1293 }
PushCpuThreadInputIfNecessary(void)1294 void SQLiteServerLoggerPlugin::PushCpuThreadInputIfNecessary(void)
1295 {
1296 RakNetTime curTime = RakNet::GetTime();
1297 if (cpuThreadInput && curTime-whenCpuThreadInputAllocated>MAX_TIME_TO_BUFFER_PACKETS)
1298 PushCpuThreadInput();
1299 }
OnAttach(void)1300 void SQLiteServerLoggerPlugin::OnAttach(void)
1301 {
1302 }
OnDetach(void)1303 void SQLiteServerLoggerPlugin::OnDetach(void)
1304 {
1305 }
SetEnableDXTCompression(bool enable)1306 void SQLiteServerLoggerPlugin::SetEnableDXTCompression(bool enable)
1307 {
1308 dxtCompressionEnabled=enable;
1309 }
1310