1 #include "SQLite3ServerPlugin.h"
2 #include "MessageIdentifiers.h"
3 #include "BitStream.h"
4 #include "GetTime.h"
5 
6 using namespace RakNet;
7 
operator <(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)8 bool operator<( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() < cls.dbIdentifier;}
operator >(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)9 bool operator>( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() > cls.dbIdentifier;}
operator ==(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)10 bool operator==( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() == cls.dbIdentifier;}
11 
12 
PerRowCallback(void * userArgument,int argc,char ** argv,char ** azColName)13 int PerRowCallback(void *userArgument, int argc, char **argv, char **azColName)
14 {
15 	SQLite3Table *outputTable = (SQLite3Table*)userArgument;
16 	DataStructures::DefaultIndexType idx;
17 	if (outputTable->columnNames.GetSize()==0)
18 	{
19 		for (idx=0; idx < (DataStructures::DefaultIndexType) argc; idx++)
20 			outputTable->columnNames.Push(azColName[idx], __FILE__, __LINE__ );
21 	}
22 	SQLite3Row *row = RakNet::OP_NEW<SQLite3Row>(__FILE__,__LINE__);
23 	outputTable->rows.Push(row,__FILE__,__LINE__);
24 	for (idx=0; idx < (DataStructures::DefaultIndexType) argc; idx++)
25 	{
26 		if (argv[idx])
27 			row->entries.Push(argv[idx], __FILE__, __LINE__ );
28 		else
29 			row->entries.Push("", __FILE__, __LINE__ );
30 	}
31 	return 0;
32 }
SQLite3ServerPlugin()33 SQLite3ServerPlugin::SQLite3ServerPlugin()
34 {
35 }
~SQLite3ServerPlugin()36 SQLite3ServerPlugin::~SQLite3ServerPlugin()
37 {
38 	StopThreads();
39 }
AddDBHandle(RakNet::RakString dbIdentifier,sqlite3 * dbHandle,bool dbAutoCreated)40 bool SQLite3ServerPlugin::AddDBHandle(RakNet::RakString dbIdentifier, sqlite3 *dbHandle, bool dbAutoCreated)
41 {
42 	if (dbIdentifier.IsEmpty())
43 		return false;
44 	DataStructures::DefaultIndexType idx = dbHandles.GetInsertionIndex(dbIdentifier);
45 	if (idx==(DataStructures::DefaultIndexType)-1)
46 		return false;
47 	NamedDBHandle ndbh;
48 	ndbh.dbHandle=dbHandle;
49 	ndbh.dbIdentifier=dbIdentifier;
50 	ndbh.dbAutoCreated=dbAutoCreated;
51 	ndbh.whenCreated=RakNet::GetTimeMS();
52 	dbHandles.InsertAtIndex(ndbh,idx,__FILE__,__LINE__);
53 
54 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
55 	if (sqlThreadPool.WasStarted()==false)
56 		sqlThreadPool.StartThreads(1,0);
57 #endif
58 
59 	return true;
60 }
RemoveDBHandle(RakNet::RakString dbIdentifier,bool alsoCloseConnection)61 void SQLite3ServerPlugin::RemoveDBHandle(RakNet::RakString dbIdentifier, bool alsoCloseConnection)
62 {
63 	DataStructures::DefaultIndexType idx = dbHandles.GetIndexOf(dbIdentifier);
64 	if (idx!=(DataStructures::DefaultIndexType)-1)
65 	{
66 		if (alsoCloseConnection)
67 		{
68 			printf("Closed %s\n", dbIdentifier.C_String());
69 			sqlite3_close(dbHandles[idx].dbHandle);
70 		}
71 		dbHandles.RemoveAtIndex(idx,__FILE__,__LINE__);
72 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
73 	if (dbHandles.GetSize()==0)
74 		StopThreads();
75 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
76 	}
77 }
RemoveDBHandle(sqlite3 * dbHandle,bool alsoCloseConnection)78 void SQLite3ServerPlugin::RemoveDBHandle(sqlite3 *dbHandle, bool alsoCloseConnection)
79 {
80 	DataStructures::DefaultIndexType idx;
81 	for (idx=0; idx < dbHandles.GetSize(); idx++)
82 	{
83 		if (dbHandles[idx].dbHandle==dbHandle)
84 		{
85 			if (alsoCloseConnection)
86 			{
87 				printf("Closed %s\n", dbHandles[idx].dbIdentifier.C_String());
88 				sqlite3_close(dbHandles[idx].dbHandle);
89 			}
90 			dbHandles.RemoveAtIndex(idx,__FILE__,__LINE__);
91 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
92 			if (dbHandles.GetSize()==0)
93 				StopThreads();
94 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
95 			return;
96 		}
97 	}
98 }
99 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
Update(void)100 void SQLite3ServerPlugin::Update(void)
101 {
102 	SQLExecThreadOutput output;
103 	while (sqlThreadPool.HasOutputFast() && sqlThreadPool.HasOutput())
104 	{
105 		output = sqlThreadPool.GetOutput();
106 		RakNet::BitStream bsOut((unsigned char*) output.data, output.length,false);
107 		SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,output.sender,false);
108 		rakFree_Ex(output.data,__FILE__,__LINE__);
109 	}
110 }
ExecStatementThread(SQLite3ServerPlugin::SQLExecThreadInput threadInput,bool * returnOutput,void * perThreadData)111 SQLite3ServerPlugin::SQLExecThreadOutput ExecStatementThread(SQLite3ServerPlugin::SQLExecThreadInput threadInput, bool *returnOutput, void* perThreadData)
112 {
113 	unsigned int queryId;
114 	RakNet::RakString dbIdentifier;
115 	RakNet::RakString inputStatement;
116 	RakNet::BitStream bsIn((unsigned char*) threadInput.data, threadInput.length, false);
117 	bsIn.IgnoreBytes(sizeof(MessageID));
118 	bsIn.Read(queryId);
119 	bsIn.Read(dbIdentifier);
120 	bsIn.Read(inputStatement);
121 	// bool isRequest;
122 	// bsIn.Read(isRequest);
123 	bsIn.IgnoreBits(1);
124 
125 	char *errorMsg;
126 	RakNet::RakString errorMsgStr;
127 	SQLite3Table outputTable;
128 	sqlite3_exec(threadInput.dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
129 	if (errorMsg)
130 	{
131 		errorMsgStr=errorMsg;
132 		sqlite3_free(errorMsg);
133 	}
134 
135 	RakNet::BitStream bsOut;
136 	bsOut.Write((MessageID)ID_SQLite3_EXEC);
137 	bsOut.Write(queryId);
138 	bsOut.Write(dbIdentifier);
139 	bsOut.Write(inputStatement);
140 	bsOut.Write(false);
141 	bsOut.Write(errorMsgStr);
142 	outputTable.Serialize(&bsOut);
143 
144 	// Free input data
145 	rakFree_Ex(threadInput.data,__FILE__,__LINE__);
146 
147 	// Copy to output data
148 	SQLite3ServerPlugin::SQLExecThreadOutput threadOutput;
149 	threadOutput.data=(char*) rakMalloc_Ex(bsOut.GetNumberOfBytesUsed(),__FILE__,__LINE__);
150 	memcpy(threadOutput.data,bsOut.GetData(),bsOut.GetNumberOfBytesUsed());
151 	threadOutput.length=bsOut.GetNumberOfBytesUsed();
152 	threadOutput.sender=threadInput.sender;
153 	// SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
154 
155 	*returnOutput=true;
156 	return threadOutput;
157 }
158 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
159 
OnReceive(Packet * packet)160 PluginReceiveResult SQLite3ServerPlugin::OnReceive(Packet *packet)
161 {
162 	switch (packet->data[0])
163 	{
164 	case ID_SQLite3_EXEC:
165 		{
166 			unsigned int queryId;
167 			RakNet::RakString dbIdentifier;
168 			RakNet::RakString inputStatement;
169 			RakNet::BitStream bsIn(packet->data, packet->length, false);
170 			bsIn.IgnoreBytes(sizeof(MessageID));
171 			bsIn.Read(queryId);
172 			bsIn.Read(dbIdentifier);
173 			bsIn.Read(inputStatement);
174 			bool isRequest;
175 			bsIn.Read(isRequest);
176 			if (isRequest)
177 			{
178 				// Server code
179 
180 				DataStructures::DefaultIndexType idx = dbHandles.GetIndexOf(dbIdentifier);
181 				if (idx==-1)
182 				{
183 					RakNet::BitStream bsOut;
184 					bsOut.Write((MessageID)ID_SQLite3_UNKNOWN_DB);
185 					bsOut.Write(queryId);
186 					bsOut.Write(dbIdentifier);
187 					bsOut.Write(inputStatement);
188 					SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
189 				}
190 				else
191 				{
192 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
193 					// Push to the thread
194 					SQLExecThreadInput input;
195 					input.data=(char*) rakMalloc_Ex(packet->length, __FILE__,__LINE__);
196 					memcpy(input.data,packet->data,packet->length);
197 					input.dbHandle=dbHandles[idx].dbHandle;
198 					input.length=packet->length;
199 					input.sender=packet->systemAddress;
200 					sqlThreadPool.AddInput(ExecStatementThread, input);
201 #else
202 					char *errorMsg;
203 					RakNet::RakString errorMsgStr;
204 					SQLite3Table outputTable;
205 					sqlite3_exec(dbHandles[idx].dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
206 					if (errorMsg)
207 					{
208 						errorMsgStr=errorMsg;
209 						sqlite3_free(errorMsg);
210 					}
211 					RakNet::BitStream bsOut;
212 					bsOut.Write((MessageID)ID_SQLite3_EXEC);
213 					bsOut.Write(queryId);
214 					bsOut.Write(dbIdentifier);
215 					bsOut.Write(inputStatement);
216 					bsOut.Write(false);
217 					bsOut.Write(errorMsgStr);
218 					outputTable.Serialize(&bsOut);
219 					SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
220 #endif
221 				}
222 			}
223 			return RR_STOP_PROCESSING_AND_DEALLOCATE;
224 		}
225 		break;
226 	}
227 
228 	return RR_CONTINUE_PROCESSING;
229 }
230 
OnAttach(void)231 void SQLite3ServerPlugin::OnAttach(void)
232 {
233 }
OnDetach(void)234 void SQLite3ServerPlugin::OnDetach(void)
235 {
236 	StopThreads();
237 }
StopThreads(void)238 void SQLite3ServerPlugin::StopThreads(void)
239 {
240 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
241 	sqlThreadPool.StopThreads();
242 	unsigned int i;
243 	for (i=0; i < sqlThreadPool.InputSize(); i++)
244 	{
245 		RakNet::OP_DELETE(sqlThreadPool.GetInputAtIndex(i).data, __FILE__, __LINE__);
246 	}
247 	sqlThreadPool.ClearInput();
248 	for (i=0; i < sqlThreadPool.OutputSize(); i++)
249 	{
250 		RakNet::OP_DELETE(sqlThreadPool.GetOutputAtIndex(i).data, __FILE__, __LINE__);
251 	}
252 	sqlThreadPool.ClearOutput();
253 #endif
254 }
255