1 #include "SQLite3ServerPlugin.h"
2 #include "MessageIdentifiers.h"
3 #include "BitStream.h"
4 #include "GetTime.h"
5
6 using namespace RakNet;
7
operator <(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)8 bool operator<( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() < cls.dbIdentifier;}
operator >(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)9 bool operator>( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() > cls.dbIdentifier;}
operator ==(const DataStructures::MLKeyRef<RakNet::RakString> & inputKey,const SQLite3ServerPlugin::NamedDBHandle & cls)10 bool operator==( const DataStructures::MLKeyRef<RakNet::RakString> &inputKey, const SQLite3ServerPlugin::NamedDBHandle &cls ) {return inputKey.Get() == cls.dbIdentifier;}
11
12
PerRowCallback(void * userArgument,int argc,char ** argv,char ** azColName)13 int PerRowCallback(void *userArgument, int argc, char **argv, char **azColName)
14 {
15 SQLite3Table *outputTable = (SQLite3Table*)userArgument;
16 DataStructures::DefaultIndexType idx;
17 if (outputTable->columnNames.GetSize()==0)
18 {
19 for (idx=0; idx < (DataStructures::DefaultIndexType) argc; idx++)
20 outputTable->columnNames.Push(azColName[idx], __FILE__, __LINE__ );
21 }
22 SQLite3Row *row = RakNet::OP_NEW<SQLite3Row>(__FILE__,__LINE__);
23 outputTable->rows.Push(row,__FILE__,__LINE__);
24 for (idx=0; idx < (DataStructures::DefaultIndexType) argc; idx++)
25 {
26 if (argv[idx])
27 row->entries.Push(argv[idx], __FILE__, __LINE__ );
28 else
29 row->entries.Push("", __FILE__, __LINE__ );
30 }
31 return 0;
32 }
SQLite3ServerPlugin()33 SQLite3ServerPlugin::SQLite3ServerPlugin()
34 {
35 }
~SQLite3ServerPlugin()36 SQLite3ServerPlugin::~SQLite3ServerPlugin()
37 {
38 StopThreads();
39 }
AddDBHandle(RakNet::RakString dbIdentifier,sqlite3 * dbHandle,bool dbAutoCreated)40 bool SQLite3ServerPlugin::AddDBHandle(RakNet::RakString dbIdentifier, sqlite3 *dbHandle, bool dbAutoCreated)
41 {
42 if (dbIdentifier.IsEmpty())
43 return false;
44 DataStructures::DefaultIndexType idx = dbHandles.GetInsertionIndex(dbIdentifier);
45 if (idx==(DataStructures::DefaultIndexType)-1)
46 return false;
47 NamedDBHandle ndbh;
48 ndbh.dbHandle=dbHandle;
49 ndbh.dbIdentifier=dbIdentifier;
50 ndbh.dbAutoCreated=dbAutoCreated;
51 ndbh.whenCreated=RakNet::GetTimeMS();
52 dbHandles.InsertAtIndex(ndbh,idx,__FILE__,__LINE__);
53
54 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
55 if (sqlThreadPool.WasStarted()==false)
56 sqlThreadPool.StartThreads(1,0);
57 #endif
58
59 return true;
60 }
RemoveDBHandle(RakNet::RakString dbIdentifier,bool alsoCloseConnection)61 void SQLite3ServerPlugin::RemoveDBHandle(RakNet::RakString dbIdentifier, bool alsoCloseConnection)
62 {
63 DataStructures::DefaultIndexType idx = dbHandles.GetIndexOf(dbIdentifier);
64 if (idx!=(DataStructures::DefaultIndexType)-1)
65 {
66 if (alsoCloseConnection)
67 {
68 printf("Closed %s\n", dbIdentifier.C_String());
69 sqlite3_close(dbHandles[idx].dbHandle);
70 }
71 dbHandles.RemoveAtIndex(idx,__FILE__,__LINE__);
72 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
73 if (dbHandles.GetSize()==0)
74 StopThreads();
75 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
76 }
77 }
RemoveDBHandle(sqlite3 * dbHandle,bool alsoCloseConnection)78 void SQLite3ServerPlugin::RemoveDBHandle(sqlite3 *dbHandle, bool alsoCloseConnection)
79 {
80 DataStructures::DefaultIndexType idx;
81 for (idx=0; idx < dbHandles.GetSize(); idx++)
82 {
83 if (dbHandles[idx].dbHandle==dbHandle)
84 {
85 if (alsoCloseConnection)
86 {
87 printf("Closed %s\n", dbHandles[idx].dbIdentifier.C_String());
88 sqlite3_close(dbHandles[idx].dbHandle);
89 }
90 dbHandles.RemoveAtIndex(idx,__FILE__,__LINE__);
91 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
92 if (dbHandles.GetSize()==0)
93 StopThreads();
94 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
95 return;
96 }
97 }
98 }
99 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
Update(void)100 void SQLite3ServerPlugin::Update(void)
101 {
102 SQLExecThreadOutput output;
103 while (sqlThreadPool.HasOutputFast() && sqlThreadPool.HasOutput())
104 {
105 output = sqlThreadPool.GetOutput();
106 RakNet::BitStream bsOut((unsigned char*) output.data, output.length,false);
107 SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,output.sender,false);
108 rakFree_Ex(output.data,__FILE__,__LINE__);
109 }
110 }
ExecStatementThread(SQLite3ServerPlugin::SQLExecThreadInput threadInput,bool * returnOutput,void * perThreadData)111 SQLite3ServerPlugin::SQLExecThreadOutput ExecStatementThread(SQLite3ServerPlugin::SQLExecThreadInput threadInput, bool *returnOutput, void* perThreadData)
112 {
113 unsigned int queryId;
114 RakNet::RakString dbIdentifier;
115 RakNet::RakString inputStatement;
116 RakNet::BitStream bsIn((unsigned char*) threadInput.data, threadInput.length, false);
117 bsIn.IgnoreBytes(sizeof(MessageID));
118 bsIn.Read(queryId);
119 bsIn.Read(dbIdentifier);
120 bsIn.Read(inputStatement);
121 // bool isRequest;
122 // bsIn.Read(isRequest);
123 bsIn.IgnoreBits(1);
124
125 char *errorMsg;
126 RakNet::RakString errorMsgStr;
127 SQLite3Table outputTable;
128 sqlite3_exec(threadInput.dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
129 if (errorMsg)
130 {
131 errorMsgStr=errorMsg;
132 sqlite3_free(errorMsg);
133 }
134
135 RakNet::BitStream bsOut;
136 bsOut.Write((MessageID)ID_SQLite3_EXEC);
137 bsOut.Write(queryId);
138 bsOut.Write(dbIdentifier);
139 bsOut.Write(inputStatement);
140 bsOut.Write(false);
141 bsOut.Write(errorMsgStr);
142 outputTable.Serialize(&bsOut);
143
144 // Free input data
145 rakFree_Ex(threadInput.data,__FILE__,__LINE__);
146
147 // Copy to output data
148 SQLite3ServerPlugin::SQLExecThreadOutput threadOutput;
149 threadOutput.data=(char*) rakMalloc_Ex(bsOut.GetNumberOfBytesUsed(),__FILE__,__LINE__);
150 memcpy(threadOutput.data,bsOut.GetData(),bsOut.GetNumberOfBytesUsed());
151 threadOutput.length=bsOut.GetNumberOfBytesUsed();
152 threadOutput.sender=threadInput.sender;
153 // SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
154
155 *returnOutput=true;
156 return threadOutput;
157 }
158 #endif // SQLite3_STATEMENT_EXECUTE_THREADED
159
OnReceive(Packet * packet)160 PluginReceiveResult SQLite3ServerPlugin::OnReceive(Packet *packet)
161 {
162 switch (packet->data[0])
163 {
164 case ID_SQLite3_EXEC:
165 {
166 unsigned int queryId;
167 RakNet::RakString dbIdentifier;
168 RakNet::RakString inputStatement;
169 RakNet::BitStream bsIn(packet->data, packet->length, false);
170 bsIn.IgnoreBytes(sizeof(MessageID));
171 bsIn.Read(queryId);
172 bsIn.Read(dbIdentifier);
173 bsIn.Read(inputStatement);
174 bool isRequest;
175 bsIn.Read(isRequest);
176 if (isRequest)
177 {
178 // Server code
179
180 DataStructures::DefaultIndexType idx = dbHandles.GetIndexOf(dbIdentifier);
181 if (idx==-1)
182 {
183 RakNet::BitStream bsOut;
184 bsOut.Write((MessageID)ID_SQLite3_UNKNOWN_DB);
185 bsOut.Write(queryId);
186 bsOut.Write(dbIdentifier);
187 bsOut.Write(inputStatement);
188 SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
189 }
190 else
191 {
192 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
193 // Push to the thread
194 SQLExecThreadInput input;
195 input.data=(char*) rakMalloc_Ex(packet->length, __FILE__,__LINE__);
196 memcpy(input.data,packet->data,packet->length);
197 input.dbHandle=dbHandles[idx].dbHandle;
198 input.length=packet->length;
199 input.sender=packet->systemAddress;
200 sqlThreadPool.AddInput(ExecStatementThread, input);
201 #else
202 char *errorMsg;
203 RakNet::RakString errorMsgStr;
204 SQLite3Table outputTable;
205 sqlite3_exec(dbHandles[idx].dbHandle, inputStatement.C_String(), PerRowCallback, &outputTable, &errorMsg);
206 if (errorMsg)
207 {
208 errorMsgStr=errorMsg;
209 sqlite3_free(errorMsg);
210 }
211 RakNet::BitStream bsOut;
212 bsOut.Write((MessageID)ID_SQLite3_EXEC);
213 bsOut.Write(queryId);
214 bsOut.Write(dbIdentifier);
215 bsOut.Write(inputStatement);
216 bsOut.Write(false);
217 bsOut.Write(errorMsgStr);
218 outputTable.Serialize(&bsOut);
219 SendUnified(&bsOut, MEDIUM_PRIORITY,RELIABLE_ORDERED,0,packet->systemAddress,false);
220 #endif
221 }
222 }
223 return RR_STOP_PROCESSING_AND_DEALLOCATE;
224 }
225 break;
226 }
227
228 return RR_CONTINUE_PROCESSING;
229 }
230
OnAttach(void)231 void SQLite3ServerPlugin::OnAttach(void)
232 {
233 }
OnDetach(void)234 void SQLite3ServerPlugin::OnDetach(void)
235 {
236 StopThreads();
237 }
StopThreads(void)238 void SQLite3ServerPlugin::StopThreads(void)
239 {
240 #ifdef SQLite3_STATEMENT_EXECUTE_THREADED
241 sqlThreadPool.StopThreads();
242 unsigned int i;
243 for (i=0; i < sqlThreadPool.InputSize(); i++)
244 {
245 RakNet::OP_DELETE(sqlThreadPool.GetInputAtIndex(i).data, __FILE__, __LINE__);
246 }
247 sqlThreadPool.ClearInput();
248 for (i=0; i < sqlThreadPool.OutputSize(); i++)
249 {
250 RakNet::OP_DELETE(sqlThreadPool.GetOutputAtIndex(i).data, __FILE__, __LINE__);
251 }
252 sqlThreadPool.ClearOutput();
253 #endif
254 }
255