1 //////////////////////////////////////////////////////////////////////////
2 //
3 // pgAdmin III - PostgreSQL Tools
4 //
5 // Copyright (C) 2002 - 2016, The pgAdmin Development Team
6 // This software is released under the PostgreSQL Licence
7 //
8 // pgQueryThread.cpp - PostgreSQL threaded query class
9 //
10 //////////////////////////////////////////////////////////////////////////
11 
12 #include "pgAdmin3.h"
13 
14 // wxWindows headers
15 #include <wx/wx.h>
16 
17 // PostgreSQL headers
18 #include <libpq-fe.h>
19 
20 // App headers
21 #include "db/pgSet.h"
22 #include "db/pgConn.h"
23 #include "db/pgQueryThread.h"
24 #include "db/pgQueryResultEvent.h"
25 #include "utils/pgDefs.h"
26 #include "utils/sysLogger.h"
27 
28 const wxEventType PGQueryResultEvent = wxNewEventType();
29 
30 // default notice processor for the pgQueryThread
31 // we do assume that the argument passed will be always the
32 // object of pgQueryThread
pgNoticeProcessor(void * _arg,const char * _message)33 void pgNoticeProcessor(void *_arg, const char *_message)
34 {
35 	wxString str(_message, wxConvUTF8);
36 
37 	wxLogNotice(wxT("%s"), str.Trim().c_str());
38 	if (_arg)
39 	{
40 		((pgQueryThread *)_arg)->AppendMessage(str);
41 	}
42 }
43 
44 
45 // support for multiple queries support
pgQueryThread(pgConn * _conn,wxEvtHandler * _caller,PQnoticeProcessor _processor,void * _noticeHandler)46 pgQueryThread::pgQueryThread(pgConn *_conn, wxEvtHandler *_caller,
47                              PQnoticeProcessor _processor, void *_noticeHandler) :
48 	wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
49 	m_cancelled(false), m_multiQueries(true), m_useCallable(false),
50 	m_caller(_caller), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
51 	m_eventOnCancellation(true)
52 {
53 	// check if we can really use the enterprisedb callable statement and
54 	// required
55 #ifdef __WXMSW__
56 	if (PQiGetOutResult && PQiPrepareOut && PQiSendQueryPreparedOut)
57 	{
58 		// we do not need all of pqi stuff as90 onwards
59 		if (m_conn && m_conn->conn && m_conn->GetIsEdb()
60 		        && !m_conn->EdbMinimumVersion(9, 0))
61 		{
62 			m_useCallable = true;
63 		}
64 	}
65 #else // __WXMSW__
66 #ifdef EDB_LIBPQ
67 	// use callable statement only with enterprisedb advanced servers
68 	// we do not need all of pqi stuff as90 onwards
69 	if (m_conn && m_conn->conn && m_conn->GetIsEdb()
70 	        && !m_conn->EdbMinimumVersion(9, 0))
71 	{
72 		m_useCallable = true;
73 	}
74 #endif // EDB_LIBPQ
75 #endif // __WXMSW__
76 
77 	if (m_conn && m_conn->conn)
78 	{
79 		PQsetnonblocking(m_conn->conn, 1);
80 	}
81 
82 	if (_processor != NULL)
83 	{
84 		m_processor = _processor;
85 		if (_noticeHandler)
86 			m_noticeHandler = _noticeHandler;
87 		else
88 			m_noticeHandler = (void *)this;
89 	}
90 	else
91 	{
92 		m_noticeHandler = (void *)this;
93 	}
94 }
95 
pgQueryThread(pgConn * _conn,const wxString & _qry,int _resultToRetrieve,wxWindow * _caller,long _eventId,void * _data)96 pgQueryThread::pgQueryThread(pgConn *_conn, const wxString &_qry,
97                              int _resultToRetrieve, wxWindow *_caller, long _eventId, void *_data)
98 	: wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
99 	  m_cancelled(false), m_multiQueries(false), m_useCallable(false),
100 	  m_caller(NULL), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
101 	  m_eventOnCancellation(true)
102 {
103 	if (m_conn && m_conn->conn)
104 	{
105 		PQsetnonblocking(m_conn->conn, 1);
106 	}
107 	m_queries.Add(
108 	    new pgBatchQuery(_qry, (pgParamsArray *)NULL, _eventId, _data, false,
109 	                     _resultToRetrieve));
110 
111 	wxLogInfo(wxT("queueing : %s"), _qry.c_str());
112 	m_noticeHandler = (void *)this;
113 
114 	if (_caller)
115 	{
116 		m_caller = _caller->GetEventHandler();
117 	}
118 }
119 
SetEventOnCancellation(bool eventOnCancelled)120 void pgQueryThread::SetEventOnCancellation(bool eventOnCancelled)
121 {
122 	m_eventOnCancellation = eventOnCancelled;
123 }
124 
AddQuery(const wxString & _qry,pgParamsArray * _params,long _eventId,void * _data,bool _useCallable,int _resultToRetrieve)125 void pgQueryThread::AddQuery(const wxString &_qry, pgParamsArray *_params,
126                              long _eventId, void *_data, bool _useCallable, int _resultToRetrieve)
127 {
128 	m_queries.Add(
129 	    new pgBatchQuery(_qry, _params, _eventId, _data,
130 	                     // use callable statement only if supported
131 	                     m_useCallable && _useCallable, _resultToRetrieve));
132 
133 	wxLogInfo(wxT("queueing (%ld): %s"), GetId(), _qry.c_str());
134 }
135 
136 
~pgQueryThread()137 pgQueryThread::~pgQueryThread()
138 {
139 	m_conn->RegisterNoticeProcessor(0, 0);
140 	WX_CLEAR_ARRAY(m_queries);
141 }
142 
143 
GetMessagesAndClear(int idx)144 wxString pgQueryThread::GetMessagesAndClear(int idx)
145 {
146 	wxString msg;
147 
148 	if (idx == -1)
149 		idx = m_currIndex;
150 
151 	if (idx >= 0 && idx <= m_currIndex)
152 	{
153 		wxCriticalSectionLocker lock(m_criticalSection);
154 		msg = m_queries[idx]->m_message;
155 		m_queries[idx]->m_message = wxT("");
156 	}
157 
158 	return msg;
159 }
160 
161 
AppendMessage(const wxString & str)162 void pgQueryThread::AppendMessage(const wxString &str)
163 {
164 	if (m_queries[m_currIndex]->m_message.IsEmpty())
165 	{
166 		wxCriticalSectionLocker lock(m_criticalSection);
167 		if (str != wxT("\n"))
168 			m_queries[m_currIndex]->m_message.Append(str);
169 	}
170 	else
171 	{
172 		wxCriticalSectionLocker lock(m_criticalSection);
173 		m_queries[m_currIndex]->m_message.Append(wxT("\n")).Append(str);
174 	}
175 }
176 
177 
Execute()178 int pgQueryThread::Execute()
179 {
180 	wxMutexLocker lock(m_queriesLock);
181 
182 	PGresult       *result           = NULL;
183 	wxMBConv       &conv             = *(m_conn->conv);
184 
185 	wxString       &query            = m_queries[m_currIndex]->m_query;
186 	int            &resultToRetrieve = m_queries[m_currIndex]->m_resToRetrieve;
187 	long           &rowsInserted     = m_queries[m_currIndex]->m_rowsInserted;
188 	Oid            &insertedOid      = m_queries[m_currIndex]->m_insertedOid;
189 	// using the alias for the pointer here, in order to save the result back
190 	// in the pgBatchQuery object
191 	pgSet         *&dataSet          = m_queries[m_currIndex]->m_resultSet;
192 	int            &rc               = m_queries[m_currIndex]->m_returnCode;
193 	pgParamsArray  *params           = m_queries[m_currIndex]->m_params;
194 	bool            useCallable      = m_queries[m_currIndex]->m_useCallable;
195 	pgError        &err              = m_queries[m_currIndex]->m_err;
196 
197 	wxCharBuffer queryBuf = query.mb_str(conv);
198 
199 	if (PQstatus(m_conn->conn) != CONNECTION_OK)
200 	{
201 		rc = pgQueryResultEvent::PGQ_CONN_LOST;
202 		err.msg_primary = _("Connection to the database server lost");
203 
204 		return(RaiseEvent(rc));
205 	}
206 
207 	if (!queryBuf && !query.IsEmpty())
208 	{
209 		rc = pgQueryResultEvent::PGQ_STRING_INVALID;
210 		m_conn->SetLastResultError(NULL, _("the query could not be converted to the required encoding."));
211 		err.msg_primary = _("Query string is empty");
212 
213 		return(RaiseEvent(rc));
214 	}
215 
216 	// Honour the parameters (if any)
217 	if (params && params->GetCount() > 0)
218 	{
219 		int    pCount = params->GetCount();
220 		int    ret    = 0,
221 		       idx    = 0;
222 
223 		Oid         *pOids    = (Oid *)malloc(pCount * sizeof(Oid));
224 		const char **pParams  = (const char **)malloc(pCount * sizeof(const char *));
225 		int         *pLens    = (int *)malloc(pCount * sizeof(int));
226 		int         *pFormats = (int *)malloc(pCount * sizeof(int));
227 		// modes are used only by enterprisedb callable statement
228 #if defined (__WXMSW__) || (EDB_LIBPQ)
229 		int         *pModes   = (int *)malloc(pCount * sizeof(int));
230 #endif
231 
232 		for (; idx < pCount; idx++)
233 		{
234 			pgParam *param = (*params)[idx];
235 
236 			pOids[idx] = param->m_type;
237 			pParams[idx] = (const char *)param->m_val;
238 			pLens[idx] = param->m_len;
239 			pFormats[idx] = param->GetFormat();
240 #if defined (__WXMSW__) || (EDB_LIBPQ)
241 			pModes[idx] = param->m_mode;
242 #endif
243 		}
244 
245 		if (useCallable)
246 		{
247 #if defined (__WXMSW__) || (EDB_LIBPQ)
248 			wxLogInfo(wxString::Format(
249 			              _("using an enterprisedb callable statement (queryid:%ld, threadid:%ld)"),
250 			              (long)m_currIndex, (long)GetId()));
251 			wxString stmt = wxString::Format(wxT("pgQueryThread-%ld-%ld"), this->GetId(), m_currIndex);
252 			PGresult *res = PQiPrepareOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
253 			                              queryBuf, pCount, pOids, pModes);
254 
255 			if( PQresultStatus(res) != PGRES_COMMAND_OK)
256 			{
257 				rc = pgQueryResultEvent::PGQ_ERROR_PREPARE_CALLABLE;
258 				err.SetError(res, &conv);
259 
260 				PQclear(res);
261 
262 				goto return_with_error;
263 			}
264 
265 			ret = PQiSendQueryPreparedOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
266 			                              pCount, pParams, pLens, pFormats, 1);
267 
268 			if (ret != 1)
269 			{
270 				rc = pgQueryResultEvent::PGQ_ERROR_EXECUTE_CALLABLE;
271 
272 				m_conn->SetLastResultError(NULL, _("Failed to run PQsendQuery in pgQueryThread"));
273 				err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
274 
275 				PQclear(res);
276 				res = NULL;
277 
278 				goto return_with_error;
279 			}
280 
281 			PQclear(res);
282 			res = NULL;
283 #else
284 			rc = -1;
285 			wxASSERT_MSG(false,
286 			             _("the program execution flow must not reach to this point in pgQueryThread"));
287 
288 			goto return_with_error;
289 #endif
290 		}
291 		else
292 		{
293 			// assumptions: we will need the results in text format only
294 			ret = PQsendQueryParams(m_conn->conn, queryBuf, pCount, pOids, pParams, pLens, pFormats, 0);
295 
296 			if (ret != 1)
297 			{
298 				rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
299 
300 				m_conn->SetLastResultError(NULL,
301 				                           _("Failed to run PQsendQueryParams in pgQueryThread"));
302 
303 				err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
304 				                  wxString(PQerrorMessage(m_conn->conn), conv);
305 
306 				goto return_with_error;
307 			}
308 		}
309 		goto continue_without_error;
310 
311 return_with_error:
312 		{
313 			free(pOids);
314 			free(pParams);
315 			free(pLens);
316 			free(pFormats);
317 #if defined (__WXMSW__) || (EDB_LIBPQ)
318 			free(pModes);
319 #endif
320 			return (RaiseEvent(rc));
321 		}
322 	}
323 	else
324 	{
325 		// use the PQsendQuery api in case, we don't have any parameters to
326 		// pass to the server
327 		if (!PQsendQuery(m_conn->conn, queryBuf))
328 		{
329 			rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
330 
331 			err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
332 			                  wxString(PQerrorMessage(m_conn->conn), conv);
333 
334 			return(RaiseEvent(rc));
335 		}
336 	}
337 
338 continue_without_error:
339 	int resultsRetrieved = 0;
340 	PGresult *lastResult = 0;
341 	bool connExecutionCancelled = false;
342 
343 	while (true)
344 	{
345 		// This is a 'joinable' thread, it is not advisable to call 'delete'
346 		// function on this.
347 		// Hence - it does not make sense to use the function 'testdestroy' here.
348 		// We introduced the 'CancelExecution' function for the same purpose.
349 		//
350 		// We will have to call the CancelExecution function of pgConn to
351 		// inform the backend that the user has cancelled the execution.
352 		//
353 		// We will need to consume all the results before quiting from the thread.
354 		// Otherwise - the connection object will become unusable, and throw
355 		// error - because libpq connections expects application to consume all
356 		// the result, before executing another query
357 		//
358 		if (m_cancelled && rc != pgQueryResultEvent::PGQ_EXECUTION_CANCELLED)
359 		{
360 			// We shouldn't be calling cancellation multiple time
361 			if (!connExecutionCancelled)
362 			{
363 				m_conn->CancelExecution();
364 				connExecutionCancelled = true;
365 			}
366 			rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
367 
368 			err.msg_primary = _("Execution Cancelled");
369 
370 			if (lastResult)
371 			{
372 				PQclear(lastResult);
373 				lastResult = NULL;
374 			}
375 		}
376 
377 		if ((rc = PQconsumeInput(m_conn->conn)) != 1)
378 		{
379 			if (m_cancelled)
380 			{
381 				rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
382 				err.msg_primary = _("Execution Cancelled");
383 
384 				if (lastResult)
385 				{
386 					PQclear(lastResult);
387 					lastResult = NULL;
388 				}
389 				// There is nothing more to consume.
390 				// We can quit the thread now.
391 				//
392 				// Raise the event - if the component asked for it on
393 				// execution cancellation.
394 				if (m_eventOnCancellation)
395 					RaiseEvent(rc);
396 
397 				return rc;
398 			}
399 
400 			if (rc == 0)
401 			{
402 				err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
403 			}
404 			if (PQstatus(m_conn->conn) == CONNECTION_BAD)
405 			{
406 				err.msg_primary = _("Connection to the database server lost");
407 				rc = pgQueryResultEvent::PGQ_CONN_LOST;
408 			}
409 			else
410 			{
411 				rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
412 			}
413 
414 			return(RaiseEvent(rc));
415 		}
416 
417 		if (PQisBusy(m_conn->conn))
418 		{
419 			Yield();
420 			this->Sleep(10);
421 
422 			continue;
423 		}
424 
425 		// if resultToRetrieve is given, the nth result will be returned,
426 		// otherwise the last result set will be returned.
427 		// all others are discarded
428 		PGresult *res = PQgetResult(m_conn->conn);
429 
430 		if (!res)
431 			break;
432 
433 		if((PQresultStatus(res) == PGRES_NONFATAL_ERROR) ||
434 		        (PQresultStatus(res) == PGRES_FATAL_ERROR) ||
435 		        (PQresultStatus(res) == PGRES_BAD_RESPONSE))
436 		{
437 			result = res;
438 			err.SetError(res, &conv);
439 
440 			// Wait for the execution to be finished
441 			// We need to fetch all the results, before sending the error
442 			// message
443 			do
444 			{
445 				if (PQconsumeInput(m_conn->conn) != 1)
446 				{
447 					if (m_cancelled)
448 					{
449 						rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
450 
451 						// Release the result as the query execution has been cancelled by the
452 						// user
453 						if (result)
454 							PQclear(result);
455 
456 						result = NULL;
457 
458 						if (m_eventOnCancellation)
459 							RaiseEvent(rc);
460 
461 						return rc;
462 					}
463 					goto out_of_consume_input_loop;
464 				}
465 
466 				if ((res = PQgetResult(m_conn->conn)) == NULL)
467 				{
468 					goto out_of_consume_input_loop;
469 				}
470 
471 				// Release the temporary results
472 				PQclear(res);
473 				res = NULL;
474 
475 				if (PQisBusy(m_conn->conn))
476 				{
477 					Yield();
478 					this->Sleep(10);
479 				}
480 			}
481 			while (true);
482 
483 			break;
484 		}
485 
486 #if defined (__WXMSW__) || (EDB_LIBPQ)
487 		// there should be 2 results in the callable statement - the first is the
488 		// dummy, the second contains our out params.
489 		if (useCallable)
490 		{
491 			PQclear(res);
492 			result = PQiGetOutResult(m_conn->conn);
493 		}
494 #endif
495 		if (PQresultStatus(res) == PGRES_COPY_IN)
496 		{
497 			rc = PGRES_COPY_IN;
498 			PQputCopyEnd(m_conn->conn, "not supported by pgadmin");
499 		}
500 
501 		if (PQresultStatus(res) == PGRES_COPY_OUT)
502 		{
503 			int copyRc;
504 			char *buf;
505 			int copyRows = 0;
506 			int lastCopyRc = 0;
507 
508 			rc = PGRES_COPY_OUT;
509 
510 			AppendMessage(_("query returned copy data:\n"));
511 
512 			while((copyRc = PQgetCopyData(m_conn->conn, &buf, 1)) >= 0)
513 			{
514 				// Ignore processing the query result, when it has already been
515 				// cancelled by the user
516 				if (m_cancelled)
517 				{
518 					if (!connExecutionCancelled)
519 					{
520 						m_conn->CancelExecution();
521 						connExecutionCancelled = true;
522 					}
523 					continue;
524 				}
525 
526 				if (buf != NULL)
527 				{
528 					if (copyRows < 100)
529 					{
530 						wxString str(buf, conv);
531 						wxCriticalSectionLocker cs(m_criticalSection);
532 						m_queries[m_currIndex]->m_message.Append(str);
533 
534 					}
535 					else if (copyRows == 100)
536 						AppendMessage(_("Query returned more than 100 copy rows, discarding the rest...\n"));
537 
538 					PQfreemem(buf);
539 				}
540 				if (copyRc > 0)
541 					copyRows++;
542 
543 				if (lastCopyRc == 0 && copyRc == 0)
544 				{
545 					Yield();
546 					this->Sleep(10);
547 				}
548 				if (copyRc == 0)
549 				{
550 					if (!PQconsumeInput(m_conn->conn))
551 					{
552 						// It might be the case - it is a result of the
553 						// execution cancellation.
554 						if (m_cancelled)
555 						{
556 							rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
557 
558 							// Release the result as the query execution has been cancelled by the
559 							// user
560 							if (result)
561 								PQclear(result);
562 
563 							result = NULL;
564 
565 							if (m_eventOnCancellation)
566 								RaiseEvent(rc);
567 
568 							return rc;
569 						}
570 						if (PQstatus(m_conn->conn) == CONNECTION_BAD)
571 						{
572 							err.msg_primary = _("Connection to the database server lost");
573 							rc = pgQueryResultEvent::PGQ_CONN_LOST;
574 						}
575 						else
576 						{
577 							rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
578 
579 							err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
580 						}
581 						return(RaiseEvent(rc));
582 					}
583 				}
584 				lastCopyRc = copyRc;
585 			}
586 
587 			res = PQgetResult(m_conn->conn);
588 
589 			if (!res)
590 				break;
591 		}
592 
593 		resultsRetrieved++;
594 
595 		// Save the current result, as asked by the component
596 		// But - only if the execution is not cancelled
597 		if (!m_cancelled && resultsRetrieved == resultToRetrieve)
598 		{
599 			result = res;
600 			insertedOid = PQoidValue(res);
601 			if (insertedOid && insertedOid != (Oid) - 1)
602 				AppendMessage(wxString::Format(_("query inserted one row with oid %d.\n"), insertedOid));
603 			else
604 				AppendMessage(wxString::Format(wxPLURAL("query result with %d row will be returned.\n", "query result with %d rows will be returned.\n",
605 				                                        PQntuples(result)), PQntuples(result)));
606 			continue;
607 		}
608 
609 		if (lastResult)
610 		{
611 			if (!m_cancelled && PQntuples(lastResult))
612 				AppendMessage(wxString::Format(wxPLURAL("query result with %d row discarded.\n", "query result with %d rows discarded.\n",
613 				                                        PQntuples(lastResult)), PQntuples(lastResult)));
614 			PQclear(lastResult);
615 		}
616 		lastResult = res;
617 	}
618 
619 out_of_consume_input_loop:
620 	if (m_cancelled)
621 	{
622 		rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
623 
624 		// Release the result as the query execution has been cancelled by the
625 		// user
626 		if (result)
627 			PQclear(result);
628 
629 		result = NULL;
630 
631 		if (m_eventOnCancellation)
632 			RaiseEvent(rc);
633 
634 		return rc;
635 	}
636 
637 	if (!result)
638 		result = lastResult;
639 
640 	err.SetError(result, &conv);
641 
642 	AppendMessage(wxT("\n"));
643 
644 	rc = PQresultStatus(result);
645 	if (rc == PGRES_TUPLES_OK)
646 	{
647 		dataSet = new pgSet(result, m_conn, conv, m_conn->needColQuoting);
648 		dataSet->MoveFirst();
649 	}
650 	else if (rc == PGRES_COMMAND_OK)
651 	{
652 		char *s = PQcmdTuples(result);
653 		if (*s)
654 			rowsInserted = atol(s);
655 	}
656 	else if (rc == PGRES_FATAL_ERROR ||
657 	         rc == PGRES_NONFATAL_ERROR ||
658 	         rc == PGRES_BAD_RESPONSE)
659 	{
660 		if (result)
661 		{
662 			AppendMessage(wxString(PQresultErrorMessage(result), conv));
663 			PQclear(result);
664 			result = NULL;
665 		}
666 		else
667 		{
668 			AppendMessage(wxString(PQerrorMessage(m_conn->conn), conv));
669 		}
670 
671 		return(RaiseEvent(rc));
672 	}
673 
674 	insertedOid = PQoidValue(result);
675 	if (insertedOid == (Oid) - 1)
676 		insertedOid = 0;
677 
678 	return(RaiseEvent(1));
679 }
680 
RaiseEvent(int _retval)681 int pgQueryThread::RaiseEvent(int _retval)
682 {
683 #if !defined(PGSCLI)
684 	if (m_caller)
685 	{
686 		pgQueryResultEvent resultEvent(GetId(), m_queries[m_currIndex], m_queries[m_currIndex]->m_eventID);
687 
688 		// client data
689 		resultEvent.SetClientData(m_queries[m_currIndex]->m_data);
690 		resultEvent.SetInt(_retval);
691 
692 		m_caller->AddPendingEvent(resultEvent);
693 	}
694 #endif
695 	return _retval;
696 }
697 
698 
Entry()699 void *pgQueryThread::Entry()
700 {
701 	do
702 	{
703 		if (m_currIndex < (((int)m_queries.GetCount()) - 1))
704 		{
705 			// Create the PGcancel object to enable cancelling the running
706 			// query
707 			m_conn->SetConnCancel();
708 			m_currIndex++;
709 
710 			m_queries[m_currIndex]->m_returnCode = -2;
711 			m_queries[m_currIndex]->m_rowsInserted = -1l;
712 
713 			wxLogSql(wxT("Thread executing query (%d:%s:%d): %s"),
714 			         m_currIndex + 1, m_conn->GetHost().c_str(), m_conn->GetPort(),
715 			         m_queries[m_currIndex]->m_query.c_str());
716 
717 			// register the notice processor for the current query
718 			m_conn->RegisterNoticeProcessor(m_processor, m_noticeHandler);
719 
720 			// execute the current query now
721 			Execute();
722 
723 			// remove the notice processor now
724 			m_conn->RegisterNoticeProcessor(0, 0);
725 
726 			// reset the PGcancel object
727 			m_conn->ResetConnCancel();
728 		}
729 
730 		if (!m_multiQueries || m_cancelled)
731 			break;
732 
733 		wxThread::Sleep(10);
734 	}
735 	while (true);
736 
737 	return(NULL);
738 }
739 
DeleteReleasedQueries()740 int pgQueryThread::DeleteReleasedQueries()
741 {
742 	int res = 0,
743 	    idx = 0;
744 
745 	if (m_queriesLock.TryLock() == wxMUTEX_BUSY)
746 		return res;
747 
748 	for (; idx <= m_currIndex; idx++)
749 	{
750 		if (m_queries[idx]->m_resultSet != NULL)
751 		{
752 			pgSet *set = m_queries[idx]->m_resultSet;
753 			m_queries[idx]->m_resultSet = NULL;
754 			delete set;
755 			set = NULL;
756 
757 			res++;
758 		}
759 	}
760 	m_queriesLock.Unlock();
761 
762 	return res;
763 }
764 
765 
GetResultError(int idx)766 pgError pgQueryThread::GetResultError(int idx)
767 {
768 	wxMutexLocker lock(m_queriesLock);
769 
770 	if (idx == -1)
771 		idx = m_currIndex;
772 
773 	return m_queries[idx]->m_err;
774 }
775 
776 
GetErrorMessage()777 const wxString &pgBatchQuery::GetErrorMessage()
778 {
779 	return m_err.msg_primary;
780 }
781 
~pgBatchQuery()782 pgBatchQuery::~pgBatchQuery()
783 {
784 	if (m_resultSet)
785 	{
786 		delete m_resultSet;
787 		m_resultSet = NULL;
788 	}
789 
790 	if (m_params)
791 	{
792 		WX_CLEAR_ARRAY((*m_params));
793 		delete m_params;
794 		m_params = NULL;
795 	}
796 }
797 
Release()798 bool pgBatchQuery::Release()
799 {
800 	bool res = false;
801 
802 	if (m_resultSet != NULL)
803 	{
804 		res = true;
805 
806 		pgSet *set = m_resultSet;
807 		m_resultSet = NULL;
808 
809 		if (set)
810 			delete set;
811 		set = NULL;
812 	}
813 
814 	if (m_params)
815 	{
816 		res = true;
817 
818 		WX_CLEAR_ARRAY((*m_params));
819 		delete m_params;
820 		m_params = NULL;
821 	}
822 
823 	return res;
824 }
825 
pgQueryResultEvent(unsigned long _thrdId,pgBatchQuery * _qry,int _id)826 pgQueryResultEvent::pgQueryResultEvent(
827     unsigned long _thrdId, pgBatchQuery *_qry, int _id) :
828 	wxCommandEvent(PGQueryResultEvent, _id), m_thrdId(_thrdId),
829 	m_query(_qry) { }
830 
pgQueryResultEvent(const pgQueryResultEvent & _ev)831 pgQueryResultEvent::pgQueryResultEvent(const pgQueryResultEvent &_ev)
832 	: wxCommandEvent(_ev), m_thrdId(_ev.m_thrdId), m_query(_ev.m_query) { }
833 
834 
pgParam(Oid _type,void * _val,int _len,short _mode)835 pgParam::pgParam(Oid _type, void *_val, int _len, short _mode)
836 	: m_type(_type), m_val(_val), m_len(_len), m_mode(_mode)
837 {
838 	switch(m_type)
839 	{
840 		case PGOID_TYPE_CHAR:
841 		case PGOID_TYPE_NAME:
842 		case PGOID_TYPE_TEXT:
843 		case PGOID_TYPE_VARCHAR:
844 		case PGOID_TYPE_CSTRING:
845 			m_format = 0;
846 		default:
847 			m_format = 1;
848 	}
849 }
850 
851 // wxString data
pgParam(Oid _oid,wxString * _val,wxMBConv * _conv,short _mode)852 pgParam::pgParam(Oid _oid, wxString *_val, wxMBConv *_conv, short _mode)
853 	: m_mode(_mode)
854 {
855 	if (m_mode == PG_PARAM_OUT || !_val)
856 	{
857 		m_len = 0;
858 	}
859 	else
860 	{
861 		m_len = _val->Len();
862 	}
863 	if (_val)
864 	{
865 		char *str = (char *)malloc(m_len + 1);
866 		if (!_val->IsEmpty() && _mode != PG_PARAM_OUT)
867 		{
868 			strncpy(str,
869 			        (const char *)_val->mb_str(
870 			            *(_conv != NULL ? _conv : &wxConvLocal)), m_len);
871 			str[m_len] = '\0';
872 		}
873 		else
874 		{
875 			str[0] = '\0';
876 		}
877 		m_val = (void *)(str);
878 	}
879 	else
880 	{
881 		m_val = NULL;
882 	}
883 	m_type = _oid;
884 
885 	// text format
886 	m_format = 0;
887 }
888 
889 
~pgParam()890 pgParam::~pgParam()
891 {
892 	if (m_val)
893 		free(m_val);
894 	m_val = NULL;
895 }
896 
897 
GetFormat()898 int pgParam::GetFormat()
899 {
900 	return m_format;
901 }
902