1 #include "SQLiteServerLoggerPlugin.h"
2 #include "RakPeerInterface.h"
3 #include "PacketizedTCP.h"
4 #include "MessageIdentifiers.h"
5 #include "SQLiteLoggerCommon.h"
6 #include "jpeglib.h"
7 #include "jpeg_memory_dest.h"
8 #include "FileOperations.h"
9 #include "GetTime.h"
10 #include <time.h>
11 #include <stdio.h>
12 #include <sys/types.h>
13 #include <sys/timeb.h>
14 #include "DXTCompressor.h"
15 
16 // http://web.utk.edu/~jplyon/sqlite/SQLite_optimization_FAQ.html
17 
18 // See jmorecfg.h
19 /*
20 * Ordering of RGB data in scanlines passed to or from the application.
21 * If your application wants to deal with data in the order B,G,R, just
22 * change these macros.  You can also deal with formats such as R,G,B,X
23 * (one extra byte per pixel) by changing RGB_PIXELSIZE.  Note that changing
24 * the offsets will also change the order in which colormap data is organized.
25 * RESTRICTIONS:
26 * 1. The sample applications cjpeg,djpeg do NOT support modified RGB formats.
27 * 2. These macros only affect RGB<=>YCbCr color conversion, so they are not
28 *    useful if you are using JPEG color spaces other than YCbCr or grayscale.
29 * 3. The color quantizer modules will not behave desirably if RGB_PIXELSIZE
30 *    is not 3 (they don't understand about dummy color components!).  So you
31 *    can't use color quantization if you change that value.
32 */
33 
34 //#define RGB_RED		0	/* Offset of Red in an RGB scanline element */
35 //#define RGB_GREEN	1	/* Offset of Green */
36 //#define RGB_BLUE	2	/* Offset of Blue */
37 //#define RGB_PIXELSIZE	3	/* JSAMPLEs per RGB scanline element */
38 
39 using namespace RakNet;
40 
41 // JPEG ENCODING ERRORS
42 struct my_error_mgr {
43 	struct jpeg_error_mgr pub;
44 };
my_error_exit(j_common_ptr cinfo)45 METHODDEF(void) my_error_exit (j_common_ptr cinfo) {}
46 
47 #define FILE_COLUMN "SourceFile"
48 #define LINE_COLUMN "SourceLine"
49 #define TICK_COUNT_COLUMN "AutoTickCount"
50 #define AUTO_IP_COLUMN "AutoIPAddress"
51 #define TIMESTAMP_NUMERIC_COLUMN "TimestampNumeric"
52 #define TIMESTAMP_TEXT_COLUMN "TimestampText"
53 #define FUNCTION_CALL_FRIENDLY_TEXT "FunctionCallFriendlyText"
54 #define FUNCTION_CALL_TABLE "'functionCalls'"
55 #define FUNCTION_CALL_PARAMETERS_TABLE "'functionCallParameters'"
56 
57 // Store packets in the CPUThreadInput until at most this much time has elapsed. This is so
58 // batch processing can occur on multiple image sources at once
59 static const RakNetTime MAX_TIME_TO_BUFFER_PACKETS=1000;
60 
61 // WTF am I getting this?
62 // 2>SQLiteServerLoggerPlugin.obj : error LNK2019: unresolved external symbol _GetSqlDataTypeName referenced in function "struct RakNet::SQLite3ServerPlugin::SQLExecThreadOutput __cdecl ExecSQLLoggingThread(struct RakNet::SQLite3ServerPlugin::SQLExecThreadInput,bool *,void *)" (?ExecSQLLoggingThread@@YA?AUExecThreadOutput@SQLite3ServerPlugin@RakNet@@UExecThreadInput@23@PA_NPAX@Z)
63 // 2>C:\RakNet\Debug\SQLiteServerLogger.exe : fatal error LNK1120: 1 unresolved externals
64 static const char *sqlDataTypeNames[SQLLPDT_COUNT] =
65 {
66 	"INTEGER",
67 	"INTEGER",
68 	"NUMERIC",
69 	"TEXT",
70 	"BLOB",
71 	"BLOB",
72 };
GetSqlDataTypeName2(SQLLoggerPrimaryDataType idx)73 const char *GetSqlDataTypeName2(SQLLoggerPrimaryDataType idx) {return sqlDataTypeNames[(int)idx];}
74 
CompressAsJpeg(char ** cptrInOut,uint32_t * sizeInOut,uint16_t imageWidth,uint16_t imageHeight,int16_t linePitch,unsigned char input_components)75 void CompressAsJpeg(char **cptrInOut, uint32_t *sizeInOut, uint16_t imageWidth, uint16_t imageHeight, int16_t linePitch, unsigned char input_components)
76 {
77 	RakNetTimeUS t1=RakNet::GetTimeUS();
78 
79 	// Compress to jpg
80 	// http://www.google.com/codesearch/p?hl=en#I-_InJ6STRE/gthumb-1.108/libgthumb/pixbuf-utils.c&q=jpeg_create_compress
81 	// http://ftp.gnome.org	/ pub/	GNOME	/	sources	/gthumb	/1.108/	gthumb-1.108.tar.gz/
82 
83 	struct jpeg_compress_struct cinfo;
84 	struct my_error_mgr jerr;
85 	cinfo.err = jpeg_std_error(&jerr.pub);
86 	jerr.pub.error_exit = my_error_exit;
87 	jpeg_create_compress(&cinfo);
88 	cinfo.smoothing_factor = 0;
89 	cinfo.optimize_coding = false;
90 	cinfo.image_width      = imageWidth;
91 	cinfo.image_height     = imageHeight;
92 	cinfo.input_components = input_components;
93 	cinfo.in_color_space = JCS_RGB;
94 
95 
96 	jpeg_set_defaults (&cinfo);
97 	cinfo.dct_method=JDCT_FLOAT;
98 	// Not sure why they made RGB_PIXELSIZE a global define. I added this so it only uses my value in this one place, to not break other things
99 	cinfo.hack_use_input_components_as_RGB_PIXELSIZE=1;
100 	jpeg_set_quality (&cinfo, 75, TRUE);
101 	int jpegSizeAfterCompression = 0; //size of jpeg after compression
102 	char * storage = (char*) rakMalloc_Ex(*sizeInOut+50000,__FILE__,__LINE__);
103 	jpeg_memory_dest(&cinfo,(JOCTET*)storage,*sizeInOut+50000,&jpegSizeAfterCompression);
104 
105 	JSAMPROW row_pointer[1];
106 	jpeg_start_compress (&cinfo, TRUE);
107 	while( cinfo.next_scanline < cinfo.image_height )
108 	{
109 		row_pointer[0] = (JSAMPROW) &((*cptrInOut)[cinfo.next_scanline * linePitch ]);
110 		jpeg_write_scanlines(&cinfo, row_pointer, 1);
111 
112 	}
113 
114 	/* finish off */
115 	jpeg_finish_compress (&cinfo);
116 	jpeg_destroy_compress(&cinfo);
117 
118 	rakFree_Ex(*cptrInOut,__FILE__,__LINE__);
119 	*cptrInOut = (char*) rakRealloc_Ex(storage, jpegSizeAfterCompression,__FILE__,__LINE__);
120 	*sizeInOut=jpegSizeAfterCompression;
121 
122 	RakNetTimeUS t2=RakNet::GetTimeUS();
123 	RakNetTimeUS diff=t2-t1;
124 }
125 static bool needsDxtInit=true;
126 static bool dxtCompressionSupported=false;
InitDxt()127 void* InitDxt()
128 {
129 	if (needsDxtInit==true)
130 	{
131 		dxtCompressionSupported=DXTCompressor::Initialize();
132 		if (dxtCompressionSupported==false)
133 		{
134 			printf("Warning, DXT compressor failed to start.\nImages will be compressed with jpg instead.\n");
135 		}
136 	}
137 	needsDxtInit=false;
138 	return 0;
139 }
DeinitDxt(void *)140 void DeinitDxt(void*)
141 {
142 	if (dxtCompressionSupported)
143 		DXTCompressor::Shutdown();
144 	needsDxtInit=true;
145 	dxtCompressionSupported=false;
146 }
ExecCPULoggingThread(SQLiteServerLoggerPlugin::CPUThreadInput * cpuThreadInput,bool * returnOutput,void * perThreadData)147 SQLiteServerLoggerPlugin::CPUThreadOutput* ExecCPULoggingThread(SQLiteServerLoggerPlugin::CPUThreadInput* cpuThreadInput, bool *returnOutput, void* perThreadData)
148 {
149 	int i;
150 	*returnOutput=true;
151 	SQLiteServerLoggerPlugin::CPUThreadOutput *cpuThreadOutput = RakNet::OP_NEW<SQLiteServerLoggerPlugin::CPUThreadOutput>(__FILE__,__LINE__);
152 	cpuThreadOutput->arraySize=cpuThreadInput->arraySize;
153 	//cpuThreadOutput->cpuOutputNodeArray=RakNet::OP_NEW_ARRAY<SQLiteServerLoggerPlugin::CPUThreadOutputNode*>(cpuThreadInput->arraySize,__FILE__,__LINE__);
154 	//printf("1. arraySize=%i, ",cpuThreadInput->arraySize);
155 	for (i=0; i<cpuThreadInput->arraySize; i++)
156 	{
157 		cpuThreadOutput->cpuOutputNodeArray[i]=RakNet::OP_NEW<SQLiteServerLoggerPlugin::CPUThreadOutputNode>(__FILE__,__LINE__);
158 		SQLiteServerLoggerPlugin::CPUThreadOutputNode *outputNode = cpuThreadOutput->cpuOutputNodeArray[i];
159 		Packet *packet = cpuThreadInput->cpuInputArray[i].packet;
160 		RakNet::RakString dbIdentifier = cpuThreadInput->cpuInputArray[i].dbIdentifier;
161 		// outputNode->whenMessageArrived = cpuThreadInput->cpuInputArray[i].whenMessageArrived;
162 		outputNode->packet=packet;
163 
164 		packet->systemAddress.ToString(true,outputNode->ipAddressString);
165 		RakNet::BitStream bitStream(packet->data, packet->length, false);
166 		bitStream.IgnoreBytes(1);
167 		bitStream.Read(outputNode->dbIdentifier);
168 		bitStream.Read(outputNode->tableName);
169 		outputNode->tableName.SQLEscape();
170 		bitStream.Read(outputNode->line);
171 		bitStream.Read(outputNode->file);
172 		bitStream.Read(outputNode->tickCount);
173 		bitStream.Read(outputNode->clientSendingTime);
174 		bitStream.Read(outputNode->isFunctionCall);
175 		bitStream.Read(outputNode->parameterCount);
176 		if (outputNode->isFunctionCall==false)
177 		{
178 			RakNet::RakString columnName;
179 		//	printf("2. parameterCount=%i, ",outputNode->parameterCount);
180 			for (int i=0; i < outputNode->parameterCount; i++)
181 			{
182 				bitStream.Read(columnName);
183 				columnName.SQLEscape();
184 				columnName.RemoveCharacter(' ');
185 				outputNode->insertingColumnNames.Push(columnName, __FILE__, __LINE__ );
186 			}
187 		}
188 
189 		int parameterCountIndex=0;
190 	//	printf("3. parameterCount=%i, ",outputNode->parameterCount);
191 		while (parameterCountIndex < outputNode->parameterCount)
192 		{
193 			outputNode->parameterList[parameterCountIndex].Deserialize(&bitStream);
194 
195 			if (outputNode->parameterList[parameterCountIndex].size>0)
196 			{
197 				parameterCountIndex++;
198 			}
199 			else
200 			{
201 				// Skip empty parameters
202 				if (outputNode->isFunctionCall==false)
203 					outputNode->insertingColumnNames.RemoveAtIndex(parameterCountIndex);
204 				outputNode->parameterCount--;
205 			}
206 		}
207 
208 //		sqlite3_stmt *statement;
209 //		char *errorMsg;
210 	//	printf("4. parameterCount=%i, ",outputNode->parameterCount);
211 		for (parameterCountIndex=0; parameterCountIndex < outputNode->parameterCount; parameterCountIndex++)
212 		{
213 			if (outputNode->parameterList[parameterCountIndex].type==SQLLPDT_IMAGE)
214 			{
215 
216 				bool dxtCompressSuccess=false;
217 				if (dxtCompressionSupported)
218 				{
219 					char *outputData;
220 					int bufferSize = DXTCompressor::GetBufferSize(DXT1,
221 						outputNode->parameterList[parameterCountIndex].imageWidth,
222 						outputNode->parameterList[parameterCountIndex].imageHeight);
223 					int ddsHeaderSize = DXTCompressor::GetDDSHeaderSize();
224 					outputData = (char*) rakMalloc_Ex(bufferSize + ddsHeaderSize, __FILE__, __LINE__ );
225 					dxtCompressSuccess = DXTCompressor::CompressImageData(
226 					DXT1,
227 					outputNode->parameterList[parameterCountIndex].data.cptr,
228 					outputNode->parameterList[parameterCountIndex].imageWidth,
229 					outputNode->parameterList[parameterCountIndex].imageHeight,
230 					outputData+ddsHeaderSize, false, outputNode->parameterList[parameterCountIndex].sourceFormatIsBGRA );
231 
232 					if (dxtCompressSuccess)
233 					{
234 						rakFree_Ex(outputNode->parameterList[parameterCountIndex].data.cptr,__FILE__,__LINE__);
235 						DXTCompressor::WriteDDSHeader(DXT1,
236 							outputNode->parameterList[parameterCountIndex].imageWidth,
237 							outputNode->parameterList[parameterCountIndex].imageHeight,
238 							bufferSize,
239 							outputData);
240 						outputNode->parameterList[parameterCountIndex].data.cptr=outputData;
241 						outputNode->parameterList[parameterCountIndex].size=bufferSize + ddsHeaderSize;
242 
243 // 						static bool testWriteToDisk=true;
244 // 						if (testWriteToDisk)
245 // 						{
246 // 							printf("Wrote test.dds\n");
247 // 							FILE *fp = fopen("test.dds", "wb");
248 // 							fwrite(outputData,1,outputNode->parameterList[parameterCountIndex].size,fp);
249 // 							fclose(fp);
250 // 							testWriteToDisk=false;
251 // 						}
252 					}
253 					else
254 					{
255 						rakFree_Ex(outputData,__FILE__,__LINE__);
256 					}
257 				}
258 
259 				if (dxtCompressSuccess==false)
260 				{
261 					if (outputNode->parameterList[parameterCountIndex].sourceFormatIsBGRA)
262 					{
263 						// Endian swap each color component. Input should be RGBA
264 						//					int pixelIndex;
265 						//					int rowIndex;
266 						int imageHeight = outputNode->parameterList[parameterCountIndex].imageHeight;
267 						//					int imageWidth = outputNode->parameterList[parameterCountIndex].imageWidth;
268 						int bytesPerPixel = outputNode->parameterList[parameterCountIndex].input_components;
269 						int linePitch = outputNode->parameterList[parameterCountIndex].linePitch;
270 						char *dataPtr = outputNode->parameterList[parameterCountIndex].data.cptr;
271 						int totalBytes=linePitch*imageHeight;
272 						char *endPtr = dataPtr+totalBytes;
273 						unsigned char temp1;
274 						if (bytesPerPixel==3)
275 						{
276 							while (dataPtr!=endPtr)
277 							{
278 								temp1=dataPtr[2];
279 								dataPtr[2]=dataPtr[0];
280 								dataPtr[0]=temp1;
281 								dataPtr+=3;
282 							}
283 						}
284 						else
285 						{
286 							RakAssert(bytesPerPixel==4);
287 							while (dataPtr!=endPtr)
288 							{
289 								temp1=dataPtr[2];
290 								dataPtr[2]=dataPtr[0];
291 								dataPtr[0]=temp1;
292 								dataPtr+=4;
293 							}
294 						}
295 					}
296 
297 					CompressAsJpeg(
298 						&outputNode->parameterList[parameterCountIndex].data.cptr,
299 						&outputNode->parameterList[parameterCountIndex].size,
300 						outputNode->parameterList[parameterCountIndex].imageWidth,
301 						outputNode->parameterList[parameterCountIndex].imageHeight,
302 						outputNode->parameterList[parameterCountIndex].linePitch,
303 						outputNode->parameterList[parameterCountIndex].input_components
304 						);
305 //
306 // 						static bool testWriteToDisk=true;
307 // 						if (testWriteToDisk)
308 // 						{
309 // 							printf("Wrote test.jpg\n");
310 // 							FILE *fp = fopen("test.jpg", "wb");
311 // 							fwrite(outputNode->parameterList[parameterCountIndex].data.cptr,1,outputNode->parameterList[parameterCountIndex].size,fp);
312 // 							fclose(fp);
313 // 							testWriteToDisk=false;
314 // 						}
315 
316 				}
317 			}
318 		}
319 	}
320 
321 //	printf("5. out1, ");
322 	RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
323 //	printf("6. out2\n");
324 	return cpuThreadOutput;
325 }
326 struct SQLPreparedStatements
327 {
SQLPreparedStatementsSQLPreparedStatements328 	SQLPreparedStatements()
329 	{
330 		selectNameFromMaster=0;
331 		insertIntoFunctionCalls=0;
332 		insertIntoFunctionCallParameters=0;
333 	}
334 	sqlite3_stmt *selectNameFromMaster;
335 	sqlite3_stmt *insertIntoFunctionCalls;
336 	sqlite3_stmt *insertIntoFunctionCallParameters;
337 };
SQLLoggerThreadAllocPreparedStatements()338 void* SQLLoggerThreadAllocPreparedStatements()
339 {
340 	SQLPreparedStatements *s = RakNet::OP_NEW<SQLPreparedStatements>(__FILE__,__LINE__);
341 	return s;
342 }
SQLLoggerThreadDeallocPreparedStatements(void * statementStruct)343 void SQLLoggerThreadDeallocPreparedStatements(void* statementStruct)
344 {
345 	SQLPreparedStatements *s = (SQLPreparedStatements *) statementStruct;
346 	if (s->selectNameFromMaster) sqlite3_finalize(s->selectNameFromMaster);
347 	if (s->insertIntoFunctionCalls) sqlite3_finalize(s->insertIntoFunctionCalls);
348 	if (s->insertIntoFunctionCallParameters) sqlite3_finalize(s->insertIntoFunctionCallParameters);
349 	RakNet::OP_DELETE(s,__FILE__,__LINE__);
350 }
DeleteBlobOrText(void * v)351 void DeleteBlobOrText(void* v)
352 {
353 	LogParameter::Free(v);
354 }
ExecSQLLoggingThread(SQLiteServerLoggerPlugin::SQLThreadInput sqlThreadInput,bool * returnOutput,void * perThreadData)355 SQLiteServerLoggerPlugin::SQLThreadOutput ExecSQLLoggingThread(SQLiteServerLoggerPlugin::SQLThreadInput sqlThreadInput, bool *returnOutput, void* perThreadData)
356 {
357 	SQLiteServerLoggerPlugin::CPUThreadOutputNode *cpuOutputNode = sqlThreadInput.cpuOutputNode;
358 	SQLPreparedStatements *preparedStatements = (SQLPreparedStatements*) perThreadData;
359 
360 	*returnOutput=true;
361 	SQLiteServerLoggerPlugin::SQLThreadOutput sqlThreadOutput;
362 	sqlThreadOutput.cpuOutputNode=cpuOutputNode;
363 	sqlite3 *dbHandle = sqlThreadInput.dbHandle;
364 //	sqlite3_stmt *statement;
365 	char *errorMsg;
366 
367 	sqlite3_exec(dbHandle,"BEGIN TRANSACTION", 0, 0, 0);
368 
369 	int rc;
370 	if (cpuOutputNode->isFunctionCall)
371 	{
372 		if (preparedStatements->selectNameFromMaster==0)
373 		{
374 			// Create function tables if they are not there already
375 			if (sqlite3_prepare_v2(
376 				dbHandle,
377 				"SELECT name FROM sqlite_master WHERE type='table' AND name="FUNCTION_CALL_TABLE" ",
378 				-1,
379 				&preparedStatements->selectNameFromMaster,
380 				0
381 				)!=SQLITE_OK)
382 			{
383 				RakAssert("Failed PRAGMA table_info for function tables in SQLiteServerLoggerPlugin.cpp" && 0);
384 				for (int i=0; i < cpuOutputNode->parameterCount; i++)
385 					cpuOutputNode->parameterList[i].Free();
386 				sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
387 				return sqlThreadOutput;
388 			}
389 		}
390 		 rc = sqlite3_step(preparedStatements->selectNameFromMaster);
391 		sqlite3_reset(preparedStatements->selectNameFromMaster);
392 
393 		if (rc!=SQLITE_ROW)
394 		{
395 			// Create table if it isn't there already
396 			rc = sqlite3_exec(dbHandle,"CREATE TABLE "FUNCTION_CALL_TABLE" (functionId_pk INTEGER PRIMARY KEY, "FUNCTION_CALL_FRIENDLY_TEXT" TEXT, functionName TEXT, "FILE_COLUMN" TEXT, "LINE_COLUMN" INTEGER, "TICK_COUNT_COLUMN" INTEGER, "AUTO_IP_COLUMN" TEXT, "TIMESTAMP_TEXT_COLUMN" TIMESTAMP DATE DEFAULT (datetime('now','localtime')), "TIMESTAMP_NUMERIC_COLUMN" NUMERIC )", 0, 0, &errorMsg);
397 			RakAssert(rc==SQLITE_OK);
398 			sqlite3_free(errorMsg);
399 			// See sqlDataTypeNames for *val
400 			rc = sqlite3_exec(dbHandle,"CREATE TABLE "FUNCTION_CALL_PARAMETERS_TABLE" (fcpId_pk INTEGER PRIMARY KEY, functionId_fk integer NOT NULL, value TEXT, FOREIGN KEY (functionId_fk) REFERENCES "FUNCTION_CALL_TABLE" (functionId_pk))", 0, 0, &errorMsg);
401 			RakAssert(rc==SQLITE_OK);
402 			sqlite3_free(errorMsg);
403 		}
404 		else
405 		{
406 			// Table already there
407 		}
408 
409 		// Insert into function calls
410 		int parameterCountIndex;
411 		RakNet::RakString functionCallFriendlyText("%s(", cpuOutputNode->tableName.C_String());
412 		for (parameterCountIndex=0; parameterCountIndex < cpuOutputNode->parameterCount; parameterCountIndex++)
413 		{
414 			if (parameterCountIndex!=0)
415 				functionCallFriendlyText+=", ";
416 			switch (cpuOutputNode->parameterList[parameterCountIndex].type)
417 			{
418 			case SQLLPDT_POINTER:
419 				if (cpuOutputNode->parameterList[parameterCountIndex].size==4)
420 					functionCallFriendlyText+=RakNet::RakString("%p", cpuOutputNode->parameterList[parameterCountIndex].data.i);
421 				else
422 					functionCallFriendlyText+=RakNet::RakString("%p", cpuOutputNode->parameterList[parameterCountIndex].data.ll);
423 				break;
424 			case SQLLPDT_INTEGER:
425 				switch (cpuOutputNode->parameterList[parameterCountIndex].size)
426 				{
427 				case 1:
428 					functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.c);
429 					break;
430 				case 2:
431 					functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.s);
432 					break;
433 				case 4:
434 					functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.i);
435 					break;
436 				case 8:
437 					functionCallFriendlyText+=RakNet::RakString("%i", cpuOutputNode->parameterList[parameterCountIndex].data.ll);
438 					break;
439 				}
440 				break;
441 			case SQLLPDT_REAL:
442 				if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
443 					functionCallFriendlyText+=RakNet::RakString("%f", cpuOutputNode->parameterList[parameterCountIndex].data.f);
444 				else
445 					functionCallFriendlyText+=RakNet::RakString("%d", cpuOutputNode->parameterList[parameterCountIndex].data.d);
446 				break;
447 			case SQLLPDT_TEXT:
448 				functionCallFriendlyText+='"';
449 				if (cpuOutputNode->parameterList[parameterCountIndex].size>0)
450 					functionCallFriendlyText.AppendBytes(cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size);
451 				functionCallFriendlyText+='"';
452 				break;
453 			case SQLLPDT_IMAGE:
454 				functionCallFriendlyText+=RakNet::RakString("<%i byte image>", cpuOutputNode->parameterList[parameterCountIndex].size, cpuOutputNode->parameterList[parameterCountIndex].data.cptr);
455 				break;
456 			case SQLLPDT_BLOB:
457 				functionCallFriendlyText+=RakNet::RakString("<%i byte binary>", cpuOutputNode->parameterList[parameterCountIndex].size, cpuOutputNode->parameterList[parameterCountIndex].data.cptr);
458 				break;
459 			}
460 		}
461 
462 		functionCallFriendlyText+=");";
463 
464 		if (preparedStatements->insertIntoFunctionCalls==0)
465 		{
466 			rc = sqlite3_prepare_v2(dbHandle, "INSERT INTO "FUNCTION_CALL_TABLE" ("FUNCTION_CALL_FRIENDLY_TEXT", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN", "AUTO_IP_COLUMN", "TIMESTAMP_NUMERIC_COLUMN" ,functionName) VALUES (?,?,?,?,?,?,?)", -1, &preparedStatements->insertIntoFunctionCalls, 0);
467 			if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
468 			{
469 				RakAssert("Failed INSERT INTO "FUNCTION_CALL_PARAMETERS_TABLE" in SQLiteServerLoggerPlugin.cpp" && 0);
470 			}
471 		}
472 		sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 1, functionCallFriendlyText.C_String(), -1, SQLITE_TRANSIENT);
473 		sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 2, cpuOutputNode->file.C_String(), -1, SQLITE_TRANSIENT);
474 		sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 3, cpuOutputNode->line);
475 		sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 4, cpuOutputNode->tickCount);
476 		sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 5, cpuOutputNode->ipAddressString, -1, SQLITE_TRANSIENT);
477 		sqlite3_bind_int(preparedStatements->insertIntoFunctionCalls, 6, (uint32_t) (cpuOutputNode->clientSendingTime));
478 		sqlite3_bind_text(preparedStatements->insertIntoFunctionCalls, 7, cpuOutputNode->tableName.C_String(), -1, SQLITE_TRANSIENT);
479 		rc = sqlite3_step(preparedStatements->insertIntoFunctionCalls);
480 		sqlite3_reset(preparedStatements->insertIntoFunctionCalls);
481 		if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
482 		{
483 			RakAssert("Failed binding parameters to functionCalls in SQLiteServerLoggerPlugin.cpp" && 0);
484 		}
485 //		sqlite3_finalize(statement);
486 
487 //		RakNet::RakString insertIntoFunctionCallsQuery("INSERT INTO 'functionCalls' ("FUNCTION_CALL_FRIENDLY_TEXT", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN",functionName) VALUES ('%s','%s',%i,%i,'%s') ", functionCallFriendlyText.C_String(), file.C_String(), line, tickCount,tableName.C_String());
488 //		rc = sqlite3_exec(dbHandle,insertIntoFunctionCallsQuery.C_String(), 0, 0, &errorMsg);
489 //		RakAssert(rc==SQLITE_OK);
490 //		sqlite3_free(errorMsg);
491 		// Read last row id
492 		// Requires that this thread has its own connection
493 		sqlite3_int64 lastRowId = sqlite3_last_insert_rowid(dbHandle);
494 
495 		if (preparedStatements->insertIntoFunctionCallParameters==0)
496 		{
497 			rc = sqlite3_prepare_v2(dbHandle, "INSERT INTO functionCallParameters (functionId_fk, value) VALUES (?,?);", -1, &preparedStatements->insertIntoFunctionCallParameters, 0);
498 			RakAssert(rc==SQLITE_OK);
499 			sqlite3_bind_int64(preparedStatements->insertIntoFunctionCallParameters, 1, lastRowId);
500 		}
501 
502 		// Insert into parameters table
503 		for (parameterCountIndex=0; parameterCountIndex < cpuOutputNode->parameterCount; parameterCountIndex++)
504 		{
505 			switch (cpuOutputNode->parameterList[parameterCountIndex].type)
506 			{
507 			case SQLLPDT_POINTER:
508 			case SQLLPDT_INTEGER:
509 				switch (cpuOutputNode->parameterList[parameterCountIndex].size)
510 				{
511 				case 1:
512 					sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.c);
513 					break;
514 				case 2:
515 					sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.s);
516 					break;
517 				case 4:
518 					sqlite3_bind_int(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.i);
519 					break;
520 				case 8:
521 					sqlite3_bind_int64(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.ll);
522 					break;
523 				}
524 				break;
525 			case SQLLPDT_REAL:
526 				if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
527 					sqlite3_bind_double(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.f);
528 				else
529 					sqlite3_bind_double(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.d);
530 				break;
531 			case SQLLPDT_TEXT:
532 				sqlite3_bind_text(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size, 0);
533 				break;
534 			case SQLLPDT_IMAGE:
535 			case SQLLPDT_BLOB:
536 				sqlite3_bind_blob(preparedStatements->insertIntoFunctionCallParameters, 2, cpuOutputNode->parameterList[parameterCountIndex].data.vptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
537 				cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
538 				break;
539 			default:
540 				RakAssert("Hit invalid default in case label in SQLiteServerLoggerPlugin.cpp" && 0);
541 			}
542 			rc = sqlite3_step(preparedStatements->insertIntoFunctionCallParameters);
543 			sqlite3_reset(preparedStatements->insertIntoFunctionCallParameters);
544 			if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
545 			{
546 				RakAssert("Failed sqlite3_step to bind functionCall parameters in SQLiteServerLoggerPlugin.cpp" && 0);
547 			}
548 		}
549 
550 	//	if (statement)
551 	//		sqlite3_finalize(statement);
552 	}
553 	else
554 	{
555 
556 		sqlite3_stmt *pragmaTableInfo;
557 		RakNet::RakString pragmaQuery("PRAGMA table_info(%s)",cpuOutputNode->tableName.C_String());
558 		if (sqlite3_prepare_v2(
559 			dbHandle,
560 			pragmaQuery.C_String(),
561 			-1,
562 			&pragmaTableInfo,
563 			0
564 			)!=SQLITE_OK)
565 		{
566 			RakAssert("Failed PRAGMA table_info for tableName in SQLiteServerLoggerPlugin.cpp" && 0);
567 			for (int i=0; i < cpuOutputNode->parameterCount; i++)
568 				cpuOutputNode->parameterList[i].Free();
569 			sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
570 			return sqlThreadOutput;
571 		}
572 
573 		int rc = sqlite3_step(pragmaTableInfo);
574 		DataStructures::List<RakNet::RakString> existingColumnNames;
575 		DataStructures::List<RakNet::RakString> existingColumnTypes;
576 		char *errorMsg;
577 		while (rc==SQLITE_ROW)
578 		{
579 			/*
580 			int nameColumn;
581 			for (int j=0; j < sqlite3_column_count(statement); j++)
582 			{
583 				if (strcmp(sqlite3_column_name(statement,j),"name")==0)
584 				{
585 					nameColumn=j;
586 					break;
587 				}
588 			}
589 			int typeColumn;
590 			for (int j=0; j < sqlite3_column_count(statement); j++)
591 			{
592 				if (strcmp(sqlite3_column_name(statement,j),"type")==0)
593 				{
594 					typeColumn=j;
595 					break;
596 				}
597 			}
598 			*/
599 			const int nameColumn=1;
600 			const int typeColumn=2;
601 			RakAssert(strcmp(sqlite3_column_name(pragmaTableInfo,nameColumn),"name")==0);
602 			RakAssert(strcmp(sqlite3_column_name(pragmaTableInfo,typeColumn),"type")==0);
603 			RakNet::RakString columnName = sqlite3_column_text(pragmaTableInfo,nameColumn);
604 			RakNet::RakString columnType = sqlite3_column_text(pragmaTableInfo,typeColumn);
605 			existingColumnNames.Push(columnName, __FILE__, __LINE__ );
606 			existingColumnTypes.Push(columnType, __FILE__, __LINE__ );
607 
608 			rc = sqlite3_step(pragmaTableInfo);
609 		}
610 		sqlite3_reset(pragmaTableInfo);
611 		sqlite3_finalize(pragmaTableInfo);
612 		if (rc==SQLITE_ERROR)
613 		{
614 			RakAssert("Failed sqlite3_step in SQLiteServerLoggerPlugin.cpp" && 0);
615 			for (int i=0; i < cpuOutputNode->parameterCount; i++)
616 				cpuOutputNode->parameterList[i].Free();
617 			sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
618 			return sqlThreadOutput;
619 		}
620 
621 		int existingColumnNamesIndex,insertingColumnNamesIndex;
622 		if (existingColumnNames.Size()==0)
623 		{
624 			RakNet::RakString createQuery("CREATE TABLE %s (rowId_pk INTEGER PRIMARY KEY, "FILE_COLUMN" TEXT, "LINE_COLUMN" INTEGER, "TICK_COUNT_COLUMN" INTEGER, "AUTO_IP_COLUMN" TEXT, "TIMESTAMP_TEXT_COLUMN" TIMESTAMP DATE DEFAULT (datetime('now','localtime')), "TIMESTAMP_NUMERIC_COLUMN" NUMERIC",cpuOutputNode->tableName.C_String());
625 
626 			for (int i=0; i < cpuOutputNode->parameterCount; i++)
627 			{
628 				createQuery+=", ";
629 				createQuery+=cpuOutputNode->insertingColumnNames[i];
630 				createQuery+=" ";
631 				createQuery+=GetSqlDataTypeName2(cpuOutputNode->parameterList[i].type);
632 			}
633 			createQuery+=" )";
634 
635 			sqlite3_exec(dbHandle,
636 				createQuery.C_String(),
637 				0, 0, &errorMsg);
638 			RakAssert(errorMsg==0);
639 			sqlite3_free(errorMsg);
640 		}
641 		else
642 		{
643 			// Compare what is there (columnNames,columnTypes) to what we are adding. Add what is missing
644 			bool alreadyExists;
645 			for (insertingColumnNamesIndex=0; insertingColumnNamesIndex<(int) cpuOutputNode->insertingColumnNames.Size(); insertingColumnNamesIndex++)
646 			{
647 				alreadyExists=false;
648 				for (existingColumnNamesIndex=0; existingColumnNamesIndex<(int) existingColumnNames.Size(); existingColumnNamesIndex++)
649 				{
650 					if (existingColumnNames[existingColumnNamesIndex]==cpuOutputNode->insertingColumnNames[insertingColumnNamesIndex])
651 					{
652 						// Type mismatch? If so, abort
653 						if (existingColumnTypes[existingColumnNamesIndex]!=GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type))
654 						{
655 							printf("Error: Column type mismatch. TableName=%s. ColumnName%s. Existing=%s. New=%s\n",
656 								cpuOutputNode->tableName.C_String(),
657 								existingColumnNames[existingColumnNamesIndex].C_String(),
658 								existingColumnTypes[existingColumnNamesIndex].C_String(),
659 								GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type)
660 								);
661 							for (int i=0; i < cpuOutputNode->parameterCount; i++)
662 								cpuOutputNode->parameterList[i].Free();
663 							sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
664 							return sqlThreadOutput;
665 						}
666 
667 						alreadyExists=true;
668 						break;
669 					}
670 				}
671 
672 				if (alreadyExists==false)
673 				{
674 					sqlite3_exec(dbHandle,
675 						RakNet::RakString("ALTER TABLE %s ADD %s %s",
676 						cpuOutputNode->tableName.C_String(),
677 						cpuOutputNode->insertingColumnNames[insertingColumnNamesIndex].C_String(),
678 						GetSqlDataTypeName2(cpuOutputNode->parameterList[insertingColumnNamesIndex].type)
679 						).C_String(),
680 						0, 0, &errorMsg);
681 					RakAssert(errorMsg==0);
682 					sqlite3_free(errorMsg);
683 				}
684 			}
685 		}
686 
687 
688 
689 		// Insert new row
690 		RakNet::RakString insertQuery("INSERT INTO %s (", cpuOutputNode->tableName.C_String());
691 		int parameterCountIndex;
692 		for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount; parameterCountIndex++)
693 		{
694 			if (parameterCountIndex!=0)
695 				insertQuery+=", ";
696 			insertQuery+=cpuOutputNode->insertingColumnNames[parameterCountIndex].C_String();
697 		}
698 		// Add file and line to the end
699 		insertQuery+=", "FILE_COLUMN", "LINE_COLUMN", "TICK_COUNT_COLUMN", "AUTO_IP_COLUMN", "TIMESTAMP_NUMERIC_COLUMN" ) VALUES (";
700 
701 		for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount+5; parameterCountIndex++)
702 		{
703 			if (parameterCountIndex!=0)
704 				insertQuery+=", ?";
705 			else
706 				insertQuery+="?";
707 		}
708 		insertQuery+=")";
709 
710 		sqlite3_stmt *customStatement;
711 		if (sqlite3_prepare_v2(
712 			dbHandle,
713 			insertQuery.C_String(),
714 			-1,
715 			&customStatement,
716 			0
717 			)!=SQLITE_OK)
718 		{
719 			RakAssert("Failed second sqlite3_prepare_v2 in SQLiteServerLoggerPlugin.cpp" && 0);
720 			for (int i=0; i < cpuOutputNode->parameterCount; i++)
721 				cpuOutputNode->parameterList[i].Free();
722 			sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
723 			return sqlThreadOutput;
724 		}
725 
726 		for (parameterCountIndex=0; parameterCountIndex<cpuOutputNode->parameterCount; parameterCountIndex++)
727 		{
728 			switch (cpuOutputNode->parameterList[parameterCountIndex].type)
729 			{
730 				case SQLLPDT_POINTER:
731 				case SQLLPDT_INTEGER:
732 					switch (cpuOutputNode->parameterList[parameterCountIndex].size)
733 					{
734 					case 1:
735 						sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.c);
736 						break;
737 					case 2:
738 						sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.s);
739 						break;
740 					case 4:
741 						sqlite3_bind_int(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.i);
742 						break;
743 					case 8:
744 						sqlite3_bind_int64(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.ll);
745 						break;
746 					}
747 					break;
748 				case SQLLPDT_REAL:
749 					if (cpuOutputNode->parameterList[parameterCountIndex].size==sizeof(float))
750 						sqlite3_bind_double(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.f);
751 					else
752 						sqlite3_bind_double(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.d);
753 					break;
754 				case SQLLPDT_TEXT:
755 					sqlite3_bind_text(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.cptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
756 					cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
757 					break;
758 				case SQLLPDT_IMAGE:
759 				case SQLLPDT_BLOB:
760 					sqlite3_bind_blob(customStatement, parameterCountIndex+1, cpuOutputNode->parameterList[parameterCountIndex].data.vptr, cpuOutputNode->parameterList[parameterCountIndex].size, DeleteBlobOrText);
761 					cpuOutputNode->parameterList[parameterCountIndex].DoNotFree();
762 					break;
763 			}
764 		}
765 
766 		// Add file and line to the end
767 		sqlite3_bind_text(customStatement, parameterCountIndex+1, cpuOutputNode->file.C_String(), (int) cpuOutputNode->file.GetLength(), SQLITE_TRANSIENT);
768 		sqlite3_bind_int(customStatement, parameterCountIndex+2, cpuOutputNode->line);
769 		sqlite3_bind_int(customStatement, parameterCountIndex+3, cpuOutputNode->tickCount);
770 		sqlite3_bind_text(customStatement, parameterCountIndex+4, cpuOutputNode->ipAddressString, -1, SQLITE_TRANSIENT);
771 		sqlite3_bind_int(customStatement, parameterCountIndex+5, (uint32_t) (cpuOutputNode->clientSendingTime));
772 
773 		rc = sqlite3_step(customStatement);
774 		if (rc!=SQLITE_DONE && rc!=SQLITE_OK)
775 		{
776 			RakAssert("Failed sqlite3_step to bind blobs in SQLiteServerLoggerPlugin.cpp" && 0);
777 			for (int i=0; i < cpuOutputNode->parameterCount; i++)
778 				cpuOutputNode->parameterList[i].Free();
779 			sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
780 			return sqlThreadOutput;
781 		}
782 		sqlite3_finalize(customStatement);
783 	}
784 	sqlite3_exec(dbHandle,"END TRANSACTION", 0, 0, 0);
785 
786 	for (int i=0; i < cpuOutputNode->parameterCount; i++)
787 		cpuOutputNode->parameterList[i].Free();
788 
789 	return sqlThreadOutput;
790 }
791 
SQLiteServerLoggerPlugin()792 SQLiteServerLoggerPlugin::SQLiteServerLoggerPlugin()
793 {
794 	sessionManagementMode=CREATE_EACH_NAMED_DB_HANDLE;
795 	createDirectoryForFile=true;
796 	cpuThreadInput=0;
797 	dxtCompressionEnabled=false;
798 }
799 
~SQLiteServerLoggerPlugin()800 SQLiteServerLoggerPlugin::~SQLiteServerLoggerPlugin()
801 {
802 	StopCPUSQLThreads();
803 	RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
804 	CloseAllSessions();
805 }
Update(void)806 void SQLiteServerLoggerPlugin::Update(void)
807 {
808 	SQLite3ServerPlugin::Update();
809 
810 	bool hadOutput=false;
811 
812 	int arrayIndex;
813 //	unsigned int i;
814 
815 	PushCpuThreadInputIfNecessary();
816 
817 	while (cpuLoggerThreadPool.HasOutputFast() && cpuLoggerThreadPool.HasOutput())
818 	{
819 		CPUThreadOutput* cpuThreadOutput=cpuLoggerThreadPool.GetOutput();
820 		for (arrayIndex=0; arrayIndex < cpuThreadOutput->arraySize; arrayIndex++)
821 		{
822 			SQLThreadInput sqlThreadInput;
823 			CPUThreadOutputNode *outputNode = cpuThreadOutput->cpuOutputNodeArray[arrayIndex];
824 			sqlThreadInput.cpuOutputNode=outputNode;
825 			// bool alreadyHasLoggedInSession=false;
826 			int sessionIndex;
827 			for (sessionIndex=0; sessionIndex < loggedInSessions.Size(); sessionIndex++)
828 			{
829 				if (loggedInSessions[sessionIndex].systemAddress==outputNode->packet->systemAddress &&
830 					loggedInSessions[sessionIndex].sessionName==outputNode->dbIdentifier)
831 				{
832 					break;
833 				}
834 			}
835 
836 			DataStructures::DefaultIndexType idx;
837 			if (sessionManagementMode==USE_ANY_DB_HANDLE)
838 			{
839 				if (dbHandles.GetSize()>0)
840 					idx=0;
841 				else
842 					idx=-1;
843 			}
844 			else
845 			{
846 				idx = dbHandles.GetIndexOf(outputNode->dbIdentifier);
847 				if (sessionIndex==loggedInSessions.Size() && (createDirectoryForFile==true || idx==-1) && sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
848 				{
849 					// Create it, and set idx to the new one
850 					idx = CreateDBHandle(outputNode->dbIdentifier);
851 				}
852 				else if (idx==-1)
853 				{
854 					if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE || sessionManagementMode==CREATE_SHARED_NAMED_DB_HANDLE)
855 					{
856 						// Create it, and set idx to the new one
857 						idx = CreateDBHandle(outputNode->dbIdentifier);
858 					}
859 					if (sessionManagementMode==USE_NAMED_DB_HANDLE)
860 					{
861 						RakAssert("Can't find named DB handle\n" && 0);
862 					}
863 				}
864 				else
865 				{
866 					// Use idx
867 				}
868 			}
869 
870 			if (idx==-1)
871 			{
872 				DeallocPacketUnified(outputNode->packet);
873 				RakNet::OP_DELETE(outputNode,__FILE__,__LINE__);
874 			}
875 			else
876 			{
877 				if (sessionIndex==loggedInSessions.Size())
878 				{
879 					SessionNameAndSystemAddress sassy;
880 					sassy.sessionName=outputNode->dbIdentifier;
881 					sassy.systemAddress=outputNode->packet->systemAddress;
882 					sassy.referencedPointer=dbHandles[idx].dbHandle;
883 					RakNetTimeMS curTime = RakNet::GetTimeMS();
884 					RakNetTimeMS dbAge = curTime - dbHandles[idx].whenCreated;
885 //					RakNetTimeMS timeDelta = outputNode->clientSendingTime - curTime;
886 					sassy.timestampDelta=dbAge - outputNode->clientSendingTime ;
887 					// sassy.dbAgeWhenCreated=dbHandles[idx].whenCreated;
888 					loggedInSessions.Push(sassy, __FILE__, __LINE__ );
889 					sessionIndex=loggedInSessions.Size()-1;
890 				}
891 
892 				DeallocPacketUnified(outputNode->packet);
893 				sqlThreadInput.dbHandle=dbHandles[idx].dbHandle;
894 				outputNode->clientSendingTime+=loggedInSessions[sessionIndex].timestampDelta;
895 				sqlLoggerThreadPool.AddInput(ExecSQLLoggingThread, sqlThreadInput);
896 			}
897 		}
898 
899 //		RakNet::OP_DELETE_ARRAY(cpuThreadOutput->cpuOutputNodeArray);
900 		RakNet::OP_DELETE(cpuThreadOutput,__FILE__,__LINE__);
901 	}
902 
903 	while (sqlLoggerThreadPool.HasOutputFast() && sqlLoggerThreadPool.HasOutput())
904 	{
905 		hadOutput=true;
906 		RakNet::OP_DELETE(sqlLoggerThreadPool.GetOutput().cpuOutputNode,__FILE__,__LINE__);
907 	}
908 
909 	if (hadOutput)
910 		CloseUnreferencedSessions();
911 }
OnReceive(Packet * packet)912 PluginReceiveResult SQLiteServerLoggerPlugin::OnReceive(Packet *packet)
913 {
914 	PluginReceiveResult prr = SQLite3ServerPlugin::OnReceive(packet);
915 	if (prr!=RR_CONTINUE_PROCESSING)
916 		return prr;
917 
918 	switch (packet->data[0])
919 	{
920 	case ID_SQLLITE_LOGGER:
921 		{
922 			RakNet::BitStream bitStream(packet->data, packet->length, false);
923 			bitStream.IgnoreBytes(1);
924 			RakNet::RakString dbIdentifier;
925 			bitStream.Read(dbIdentifier);
926 
927 			if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
928 			{
929 				unsigned char senderAddr[32];
930 				packet->systemAddress.ToString(true,(char*) senderAddr);
931 				dbIdentifier+=':';
932 				dbIdentifier+=senderAddr;
933 			}
934 
935 			CPUThreadInput *ti = LockCpuThreadInput();
936 			ti->cpuInputArray[ti->arraySize].packet=packet;
937 		//	ti->cpuInputArray[ti->arraySize].whenMessageArrived=RakNet::GetTimeMS();
938 			ti->cpuInputArray[ti->arraySize].dbIdentifier=dbIdentifier;
939 			UnlockCpuThreadInput();
940 
941 
942 			/*
943 			unsigned int i;
944 			bool alreadyHasLoggedInSession=false;
945 			for (i=0; i < loggedInSessions.Size(); i++)
946 			{
947 				if (loggedInSessions[i].systemAddress==packet->systemAddress &&
948 					loggedInSessions[i].sessionName==dbIdentifier)
949 				{
950 					alreadyHasLoggedInSession=true;
951 					break;
952 				}
953 			}
954 
955 			DataStructures::DefaultIndexType idx;
956 			if (sessionManagementMode==USE_ANY_DB_HANDLE)
957 			{
958 				if (dbHandles.GetSize()>0)
959 					idx=0;
960 				else
961 					idx=-1;
962 			}
963 			else
964 			{
965 				idx = dbHandles.GetIndexOf(dbIdentifier);
966 				if (alreadyHasLoggedInSession==false && (createDirectoryForFile==true || idx==-1) && sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE)
967 				{
968 					// Create it, and set idx to the new one
969 					idx = CreateDBHandle(dbIdentifier);
970 				}
971 				else if (idx==-1)
972 				{
973 					if (sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE || sessionManagementMode==CREATE_SHARED_NAMED_DB_HANDLE)
974 					{
975 						// Create it, and set idx to the new one
976 						idx = CreateDBHandle(dbIdentifier);
977 					}
978 					if (sessionManagementMode==USE_NAMED_DB_HANDLE)
979 					{
980 						RakAssert("Can't find named DB handle\n" && 0);
981 					}
982 				}
983 				else
984 				{
985 					// Use idx
986 				}
987 			}
988 
989 			if (idx==-1)
990 			{
991 				return RR_STOP_PROCESSING_AND_DEALLOCATE;
992 			}
993 
994 			if (alreadyHasLoggedInSession==false)
995 			{
996 				SessionNameAndSystemAddress sassy;
997 				sassy.sessionName=dbIdentifier;
998 				sassy.systemAddress=packet->systemAddress;
999 				sassy.referencedPointer=dbHandles[idx].dbHandle;
1000 				loggedInSessions.Push(sassy);
1001 			}
1002 
1003 			SQLExecThreadInput input;
1004 			input.dbHandle=dbHandles[idx].dbHandle;
1005 			input.packet=packet;
1006 			input.whenMessageArrived=RakNet::GetTimeMS()-dbHandles[idx].whenCreated;
1007 			__sqlThreadPool.AddInput(ExecSQLLoggingThread, input);
1008 //			printf("Pending Queries: %i\n", __sqlThreadPool.InputSize());
1009 */
1010 			return RR_STOP_PROCESSING;
1011 		}
1012 	}
1013 	return RR_CONTINUE_PROCESSING;
1014 }
OnClosedConnection(SystemAddress systemAddress,RakNetGUID rakNetGUID,PI2_LostConnectionReason lostConnectionReason)1015 void SQLiteServerLoggerPlugin::OnClosedConnection(SystemAddress systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
1016 {
1017 	RakNet::RakString removedSession;
1018 	unsigned int i=0;
1019 	while (i < loggedInSessions.Size())
1020 	{
1021 		if (loggedInSessions[i].systemAddress==systemAddress)
1022 		{
1023 			removedSession=loggedInSessions[i].sessionName;
1024 			loggedInSessions.RemoveAtIndexFast(i);
1025 
1026 			/*
1027 			unsigned int j;
1028 			bool removedSessionReferenced=false;
1029 			for (j=0; j < loggedInSessions.Size(); j++)
1030 			{
1031 				if (loggedInSessions[j].sessionName==removedSession)
1032 				{
1033 					removedSessionReferenced=true;
1034 					break;
1035 				}
1036 			}
1037 			if (removedSessionReferenced==false)
1038 			{
1039 //				if (__sqlThreadPool.InputSize()>0 || __sqlThreadPool.IsWorking())
1040 //					return;
1041 
1042 //				RemoveDBHandle(removedSession, sessionManagementMode==CREATE_EACH_NAMED_DB_HANDLE||sessionManagementMode==CREATE_ONE_NAMED_DB_HANDLE);
1043 			}
1044 			*/
1045 		}
1046 		else
1047 			i++;
1048 	}
1049 
1050 	CloseUnreferencedSessions();
1051 }
CloseUnreferencedSessions(void)1052 void SQLiteServerLoggerPlugin::CloseUnreferencedSessions(void)
1053 {
1054 	DataStructures::List<sqlite3 *> sessionNames;
1055 	unsigned int j;
1056 	for (j=0; j < loggedInSessions.Size(); j++)
1057 	{
1058 		if (sessionNames.GetIndexOf(loggedInSessions[j].referencedPointer)==-1)
1059 			sessionNames.Push(loggedInSessions[j].referencedPointer, __FILE__, __LINE__ );
1060 	}
1061 
1062 	DataStructures::List<sqlite3*> unreferencedHandles;
1063 	bool isReferenced;
1064 	for (unsigned int i=0; i < dbHandles.GetSize(); i++)
1065 	{
1066 		if (dbHandles[i].dbAutoCreated)
1067 		{
1068 			j=0;
1069 			isReferenced=false;
1070 			for (j=0; j < sessionNames.Size(); j++)
1071 			{
1072 				if (sessionNames[j]==dbHandles[i].dbHandle)
1073 				{
1074 					isReferenced=true;
1075 					break;
1076 				}
1077 			}
1078 
1079 			if (isReferenced==false)
1080 			{
1081 				unreferencedHandles.Push(dbHandles[i].dbHandle,__FILE__,__LINE__);
1082 			}
1083 		}
1084 	}
1085 
1086 	if (unreferencedHandles.Size())
1087 	{
1088 		sqlLoggerThreadPool.LockInput();
1089 		if (sqlLoggerThreadPool.HasInputFast()==false)
1090 		{
1091 			RakSleep(100);
1092 			while (sqlLoggerThreadPool.NumThreadsWorking()>0)
1093 				RakSleep(30);
1094 			for (unsigned int k=0; k < unreferencedHandles.Size(); k++)
1095 			{
1096 				RemoveDBHandle(unreferencedHandles[k], true);
1097 			}
1098 		}
1099 		sqlLoggerThreadPool.UnlockInput();
1100 
1101 		if (dbHandles.GetSize()==0)
1102 			StopCPUSQLThreads();
1103 	}
1104 }
CloseAllSessions(void)1105 void SQLiteServerLoggerPlugin::CloseAllSessions(void)
1106 {
1107 	loggedInSessions.Clear(false, __FILE__, __LINE__);
1108 	CloseUnreferencedSessions();
1109 }
CreateDBHandle(RakNet::RakString dbIdentifier)1110 unsigned int SQLiteServerLoggerPlugin::CreateDBHandle(RakNet::RakString dbIdentifier)
1111 {
1112 	if (sessionManagementMode!=CREATE_EACH_NAMED_DB_HANDLE && sessionManagementMode!=CREATE_SHARED_NAMED_DB_HANDLE)
1113 		return dbHandles.GetIndexOf(dbIdentifier);
1114 
1115 	RakNet::RakString filePath = newDatabaseFilePath;
1116 	if (createDirectoryForFile)
1117 	{
1118 		filePath+=dbIdentifier;
1119 		filePath.TerminateAtLastCharacter('.');
1120 		filePath.MakeFilePath();
1121 
1122 		time_t     now;
1123 		struct tm  *ts;
1124 		char       buf[80];
1125 
1126 		/* Get the current time */
1127 		now = time(NULL);
1128 
1129 		/* Format and print the time, "ddd yyyy-mm-dd hh:mm:ss zzz" */
1130 		ts = localtime(&now);
1131 		strftime(buf, sizeof(buf), "__%a_%Y-%m-%d__%H;%M", ts);
1132 
1133 		filePath+=buf;
1134 		filePath+=RakNet::RakString("__%i", RakNet::GetTimeMS());
1135 
1136 		filePath.MakeFilePath();
1137 	}
1138 
1139 
1140 	// With no file data, just creates the directory structure
1141 	WriteFileWithDirectories(filePath.C_String(), 0, 0);
1142 
1143 	RakNet::RakString fileSafeDbIdentifier = dbIdentifier;
1144 	fileSafeDbIdentifier.TerminateAtLastCharacter(':');
1145 	RakNet::RakString fileNameWithPath=filePath+fileSafeDbIdentifier;
1146 
1147 	// SQL Open this file, and register it
1148 	sqlite3 *database;
1149 	if (sqlite3_open_v2(fileNameWithPath.C_String(), &database, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, 0)!=SQLITE_OK)
1150 	{
1151 		RakAssert("sqlite3_open_v2 failed in SQLiteServerLoggerPlugin.cpp" && 0);
1152 		return -1;
1153 	}
1154 	if (AddDBHandle(dbIdentifier, database, true))
1155 	{
1156 
1157 		char *errorMsg;
1158 		int rc = sqlite3_exec(database,"PRAGMA synchronous=OFF", 0, 0, &errorMsg);
1159 		RakAssert(rc==SQLITE_OK);
1160 		sqlite3_free(errorMsg);
1161 		rc = sqlite3_exec(database,"PRAGMA count_changes=OFF", 0, 0, &errorMsg);
1162 		RakAssert(rc==SQLITE_OK);
1163 		sqlite3_free(errorMsg);
1164 
1165 		printf("Created %s\n", fileNameWithPath.C_String());
1166 		return dbHandles.GetIndexOf(dbIdentifier);
1167 	}
1168 	else
1169 	{
1170 		RakAssert("Failed to call AddDbHandle" && 0);
1171 		return -1;
1172 	}
1173 	return -1;
1174 }
SetSessionManagementMode(SessionManagementMode _sessionManagementMode,bool _createDirectoryForFile,const char * _newDatabaseFilePath)1175 void SQLiteServerLoggerPlugin::SetSessionManagementMode(SessionManagementMode _sessionManagementMode, bool _createDirectoryForFile, const char *_newDatabaseFilePath)
1176 {
1177 	sessionManagementMode=_sessionManagementMode;
1178 	createDirectoryForFile=_createDirectoryForFile;
1179 	newDatabaseFilePath=_newDatabaseFilePath;
1180 	newDatabaseFilePath.MakeFilePath();
1181 }
OnShutdown(void)1182 void SQLiteServerLoggerPlugin::OnShutdown(void)
1183 {
1184 	CloseAllSessions();
1185 }
StopCPUSQLThreads(void)1186 void SQLiteServerLoggerPlugin::StopCPUSQLThreads(void)
1187 {
1188 	unsigned int i;
1189 	int j,k;
1190 	cpuLoggerThreadPool.StopThreads();
1191 	ClearCpuThreadInput();
1192 	for (i=0; i < cpuLoggerThreadPool.InputSize(); i++)
1193 	{
1194 		CPUThreadInput *cpuThreadInput = cpuLoggerThreadPool.GetInputAtIndex(i);
1195 		for (j=0; j < cpuThreadInput->arraySize; j++)
1196 		{
1197 			DeallocPacketUnified(cpuThreadInput->cpuInputArray[j].packet);
1198 		}
1199 		RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1200 	}
1201 	cpuLoggerThreadPool.ClearInput();
1202 	for (i=0; i < cpuLoggerThreadPool.OutputSize(); i++)
1203 	{
1204 		CPUThreadOutput *cpuThreadOutput = cpuLoggerThreadPool.GetOutputAtIndex(i);
1205 		for (j=0; j < cpuThreadOutput->arraySize; j++)
1206 		{
1207 			CPUThreadOutputNode *cpuThreadOutputNode = cpuThreadOutput->cpuOutputNodeArray[j];
1208 			DeallocPacketUnified(cpuThreadOutputNode->packet);
1209 			for (k=0; k < cpuThreadOutputNode->parameterCount; k++)
1210 				cpuThreadOutputNode->parameterList[k].Free();
1211 			RakNet::OP_DELETE(cpuThreadOutputNode,__FILE__,__LINE__);
1212 		}
1213 //		RakNet::OP_DELETE_ARRAY(cpuThreadOutput->cpuOutputNodeArray,__FILE__,__LINE__);
1214 		RakNet::OP_DELETE(cpuThreadOutput,__FILE__,__LINE__);
1215 	}
1216 	cpuLoggerThreadPool.ClearOutput();
1217 
1218 	sqlLoggerThreadPool.StopThreads();
1219 	for (i=0; i < sqlLoggerThreadPool.InputSize(); i++)
1220 		RakNet::OP_DELETE(sqlLoggerThreadPool.GetInputAtIndex(i).cpuOutputNode,__FILE__,__LINE__);
1221 	sqlLoggerThreadPool.ClearInput();
1222 	for (i=0; i < sqlLoggerThreadPool.OutputSize(); i++)
1223 		RakNet::OP_DELETE(sqlLoggerThreadPool.GetOutputAtIndex(i).cpuOutputNode,__FILE__,__LINE__);
1224 	sqlLoggerThreadPool.ClearOutput();
1225 }
GetProcessingStatus(ProcessingStatus * processingStatus)1226 void SQLiteServerLoggerPlugin::GetProcessingStatus(ProcessingStatus *processingStatus)
1227 {
1228 	if (cpuThreadInput)
1229 		processingStatus->packetsBuffered=cpuThreadInput->arraySize;
1230 	else
1231 		processingStatus->packetsBuffered=0;
1232 	processingStatus->cpuPendingProcessing=cpuLoggerThreadPool.InputSize();
1233 	processingStatus->cpuProcessedAwaitingDeallocation=cpuLoggerThreadPool.OutputSize();
1234 	processingStatus->cpuNumThreadsWorking=cpuLoggerThreadPool.NumThreadsWorking();
1235 	processingStatus->sqlPendingProcessing=sqlLoggerThreadPool.InputSize();
1236 	processingStatus->sqlProcessedAwaitingDeallocation=sqlLoggerThreadPool.OutputSize();
1237 	processingStatus->sqlNumThreadsWorking=sqlLoggerThreadPool.NumThreadsWorking();
1238 }
1239 
LockCpuThreadInput(void)1240 SQLiteServerLoggerPlugin::CPUThreadInput *SQLiteServerLoggerPlugin::LockCpuThreadInput(void)
1241 {
1242 	if (cpuThreadInput==0)
1243 	{
1244 		cpuThreadInput=RakNet::OP_NEW<CPUThreadInput>(__FILE__,__LINE__);
1245 		cpuThreadInput->arraySize=0;
1246 		whenCpuThreadInputAllocated=RakNet::GetTime();
1247 	}
1248 	return cpuThreadInput;
1249 }
ClearCpuThreadInput(void)1250 void SQLiteServerLoggerPlugin::ClearCpuThreadInput(void)
1251 {
1252 	if (cpuThreadInput!=0)
1253 	{
1254 		for (int i=0; i < cpuThreadInput->arraySize; i++)
1255 			DeallocPacketUnified(cpuThreadInput->cpuInputArray[i].packet);
1256 		RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1257 		cpuThreadInput=0;
1258 	}
1259 }
UnlockCpuThreadInput(void)1260 void SQLiteServerLoggerPlugin::UnlockCpuThreadInput(void)
1261 {
1262 	cpuThreadInput->arraySize++;
1263 	if (cpuThreadInput->arraySize==MAX_PACKETS_PER_CPU_INPUT_THREAD)
1264 		PushCpuThreadInput();
1265 	else
1266 		PushCpuThreadInputIfNecessary();
1267 }
1268 
CancelLockCpuThreadInput(void)1269 void SQLiteServerLoggerPlugin::CancelLockCpuThreadInput(void)
1270 {
1271 	if (cpuThreadInput->arraySize==0)
1272 	{
1273 		RakNet::OP_DELETE(cpuThreadInput,__FILE__,__LINE__);
1274 		cpuThreadInput=0;
1275 	}
1276 }
PushCpuThreadInput(void)1277 void SQLiteServerLoggerPlugin::PushCpuThreadInput(void)
1278 {
1279 	// cpu threads can probably be as many as I want
1280 	if (cpuLoggerThreadPool.WasStarted()==false)
1281 	{
1282 		if (dxtCompressionEnabled)
1283 			cpuLoggerThreadPool.StartThreads(1,0,InitDxt, DeinitDxt);
1284 		else
1285 			cpuLoggerThreadPool.StartThreads(1,0,0, 0);
1286 	}
1287 	// sql logger threads should probably be limited to 1 since I'm doing transaction locks and calling sqlite3_last_insert_rowid
1288 	if (sqlLoggerThreadPool.WasStarted()==false)
1289 		sqlLoggerThreadPool.StartThreads(1,0, SQLLoggerThreadAllocPreparedStatements, SQLLoggerThreadDeallocPreparedStatements);
1290 
1291 	cpuLoggerThreadPool.AddInput(ExecCPULoggingThread, cpuThreadInput);
1292 	cpuThreadInput=0;
1293 }
PushCpuThreadInputIfNecessary(void)1294 void SQLiteServerLoggerPlugin::PushCpuThreadInputIfNecessary(void)
1295 {
1296 	RakNetTime curTime = RakNet::GetTime();
1297 	if (cpuThreadInput && curTime-whenCpuThreadInputAllocated>MAX_TIME_TO_BUFFER_PACKETS)
1298 		PushCpuThreadInput();
1299 }
OnAttach(void)1300 void SQLiteServerLoggerPlugin::OnAttach(void)
1301 {
1302 }
OnDetach(void)1303 void SQLiteServerLoggerPlugin::OnDetach(void)
1304 {
1305 }
SetEnableDXTCompression(bool enable)1306 void SQLiteServerLoggerPlugin::SetEnableDXTCompression(bool enable)
1307 {
1308 	dxtCompressionEnabled=enable;
1309 }
1310