1 //////////////////////////////////////////////////////////////////////////
2 //
3 // pgAdmin III - PostgreSQL Tools
4 //
5 // Copyright (C) 2002 - 2016, The pgAdmin Development Team
6 // This software is released under the PostgreSQL Licence
7 //
8 // pgQueryThread.cpp - PostgreSQL threaded query class
9 //
10 //////////////////////////////////////////////////////////////////////////
11
12 #include "pgAdmin3.h"
13
14 // wxWindows headers
15 #include <wx/wx.h>
16
17 // PostgreSQL headers
18 #include <libpq-fe.h>
19
20 // App headers
21 #include "db/pgSet.h"
22 #include "db/pgConn.h"
23 #include "db/pgQueryThread.h"
24 #include "db/pgQueryResultEvent.h"
25 #include "utils/pgDefs.h"
26 #include "utils/sysLogger.h"
27
28 const wxEventType PGQueryResultEvent = wxNewEventType();
29
30 // default notice processor for the pgQueryThread
31 // we do assume that the argument passed will be always the
32 // object of pgQueryThread
pgNoticeProcessor(void * _arg,const char * _message)33 void pgNoticeProcessor(void *_arg, const char *_message)
34 {
35 wxString str(_message, wxConvUTF8);
36
37 wxLogNotice(wxT("%s"), str.Trim().c_str());
38 if (_arg)
39 {
40 ((pgQueryThread *)_arg)->AppendMessage(str);
41 }
42 }
43
44
45 // support for multiple queries support
pgQueryThread(pgConn * _conn,wxEvtHandler * _caller,PQnoticeProcessor _processor,void * _noticeHandler)46 pgQueryThread::pgQueryThread(pgConn *_conn, wxEvtHandler *_caller,
47 PQnoticeProcessor _processor, void *_noticeHandler) :
48 wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
49 m_cancelled(false), m_multiQueries(true), m_useCallable(false),
50 m_caller(_caller), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
51 m_eventOnCancellation(true)
52 {
53 // check if we can really use the enterprisedb callable statement and
54 // required
55 #ifdef __WXMSW__
56 if (PQiGetOutResult && PQiPrepareOut && PQiSendQueryPreparedOut)
57 {
58 // we do not need all of pqi stuff as90 onwards
59 if (m_conn && m_conn->conn && m_conn->GetIsEdb()
60 && !m_conn->EdbMinimumVersion(9, 0))
61 {
62 m_useCallable = true;
63 }
64 }
65 #else // __WXMSW__
66 #ifdef EDB_LIBPQ
67 // use callable statement only with enterprisedb advanced servers
68 // we do not need all of pqi stuff as90 onwards
69 if (m_conn && m_conn->conn && m_conn->GetIsEdb()
70 && !m_conn->EdbMinimumVersion(9, 0))
71 {
72 m_useCallable = true;
73 }
74 #endif // EDB_LIBPQ
75 #endif // __WXMSW__
76
77 if (m_conn && m_conn->conn)
78 {
79 PQsetnonblocking(m_conn->conn, 1);
80 }
81
82 if (_processor != NULL)
83 {
84 m_processor = _processor;
85 if (_noticeHandler)
86 m_noticeHandler = _noticeHandler;
87 else
88 m_noticeHandler = (void *)this;
89 }
90 else
91 {
92 m_noticeHandler = (void *)this;
93 }
94 }
95
pgQueryThread(pgConn * _conn,const wxString & _qry,int _resultToRetrieve,wxWindow * _caller,long _eventId,void * _data)96 pgQueryThread::pgQueryThread(pgConn *_conn, const wxString &_qry,
97 int _resultToRetrieve, wxWindow *_caller, long _eventId, void *_data)
98 : wxThread(wxTHREAD_JOINABLE), m_currIndex(-1), m_conn(_conn),
99 m_cancelled(false), m_multiQueries(false), m_useCallable(false),
100 m_caller(NULL), m_processor(pgNoticeProcessor), m_noticeHandler(NULL),
101 m_eventOnCancellation(true)
102 {
103 if (m_conn && m_conn->conn)
104 {
105 PQsetnonblocking(m_conn->conn, 1);
106 }
107 m_queries.Add(
108 new pgBatchQuery(_qry, (pgParamsArray *)NULL, _eventId, _data, false,
109 _resultToRetrieve));
110
111 wxLogInfo(wxT("queueing : %s"), _qry.c_str());
112 m_noticeHandler = (void *)this;
113
114 if (_caller)
115 {
116 m_caller = _caller->GetEventHandler();
117 }
118 }
119
SetEventOnCancellation(bool eventOnCancelled)120 void pgQueryThread::SetEventOnCancellation(bool eventOnCancelled)
121 {
122 m_eventOnCancellation = eventOnCancelled;
123 }
124
AddQuery(const wxString & _qry,pgParamsArray * _params,long _eventId,void * _data,bool _useCallable,int _resultToRetrieve)125 void pgQueryThread::AddQuery(const wxString &_qry, pgParamsArray *_params,
126 long _eventId, void *_data, bool _useCallable, int _resultToRetrieve)
127 {
128 m_queries.Add(
129 new pgBatchQuery(_qry, _params, _eventId, _data,
130 // use callable statement only if supported
131 m_useCallable && _useCallable, _resultToRetrieve));
132
133 wxLogInfo(wxT("queueing (%ld): %s"), GetId(), _qry.c_str());
134 }
135
136
~pgQueryThread()137 pgQueryThread::~pgQueryThread()
138 {
139 m_conn->RegisterNoticeProcessor(0, 0);
140 WX_CLEAR_ARRAY(m_queries);
141 }
142
143
GetMessagesAndClear(int idx)144 wxString pgQueryThread::GetMessagesAndClear(int idx)
145 {
146 wxString msg;
147
148 if (idx == -1)
149 idx = m_currIndex;
150
151 if (idx >= 0 && idx <= m_currIndex)
152 {
153 wxCriticalSectionLocker lock(m_criticalSection);
154 msg = m_queries[idx]->m_message;
155 m_queries[idx]->m_message = wxT("");
156 }
157
158 return msg;
159 }
160
161
AppendMessage(const wxString & str)162 void pgQueryThread::AppendMessage(const wxString &str)
163 {
164 if (m_queries[m_currIndex]->m_message.IsEmpty())
165 {
166 wxCriticalSectionLocker lock(m_criticalSection);
167 if (str != wxT("\n"))
168 m_queries[m_currIndex]->m_message.Append(str);
169 }
170 else
171 {
172 wxCriticalSectionLocker lock(m_criticalSection);
173 m_queries[m_currIndex]->m_message.Append(wxT("\n")).Append(str);
174 }
175 }
176
177
Execute()178 int pgQueryThread::Execute()
179 {
180 wxMutexLocker lock(m_queriesLock);
181
182 PGresult *result = NULL;
183 wxMBConv &conv = *(m_conn->conv);
184
185 wxString &query = m_queries[m_currIndex]->m_query;
186 int &resultToRetrieve = m_queries[m_currIndex]->m_resToRetrieve;
187 long &rowsInserted = m_queries[m_currIndex]->m_rowsInserted;
188 Oid &insertedOid = m_queries[m_currIndex]->m_insertedOid;
189 // using the alias for the pointer here, in order to save the result back
190 // in the pgBatchQuery object
191 pgSet *&dataSet = m_queries[m_currIndex]->m_resultSet;
192 int &rc = m_queries[m_currIndex]->m_returnCode;
193 pgParamsArray *params = m_queries[m_currIndex]->m_params;
194 bool useCallable = m_queries[m_currIndex]->m_useCallable;
195 pgError &err = m_queries[m_currIndex]->m_err;
196
197 wxCharBuffer queryBuf = query.mb_str(conv);
198
199 if (PQstatus(m_conn->conn) != CONNECTION_OK)
200 {
201 rc = pgQueryResultEvent::PGQ_CONN_LOST;
202 err.msg_primary = _("Connection to the database server lost");
203
204 return(RaiseEvent(rc));
205 }
206
207 if (!queryBuf && !query.IsEmpty())
208 {
209 rc = pgQueryResultEvent::PGQ_STRING_INVALID;
210 m_conn->SetLastResultError(NULL, _("the query could not be converted to the required encoding."));
211 err.msg_primary = _("Query string is empty");
212
213 return(RaiseEvent(rc));
214 }
215
216 // Honour the parameters (if any)
217 if (params && params->GetCount() > 0)
218 {
219 int pCount = params->GetCount();
220 int ret = 0,
221 idx = 0;
222
223 Oid *pOids = (Oid *)malloc(pCount * sizeof(Oid));
224 const char **pParams = (const char **)malloc(pCount * sizeof(const char *));
225 int *pLens = (int *)malloc(pCount * sizeof(int));
226 int *pFormats = (int *)malloc(pCount * sizeof(int));
227 // modes are used only by enterprisedb callable statement
228 #if defined (__WXMSW__) || (EDB_LIBPQ)
229 int *pModes = (int *)malloc(pCount * sizeof(int));
230 #endif
231
232 for (; idx < pCount; idx++)
233 {
234 pgParam *param = (*params)[idx];
235
236 pOids[idx] = param->m_type;
237 pParams[idx] = (const char *)param->m_val;
238 pLens[idx] = param->m_len;
239 pFormats[idx] = param->GetFormat();
240 #if defined (__WXMSW__) || (EDB_LIBPQ)
241 pModes[idx] = param->m_mode;
242 #endif
243 }
244
245 if (useCallable)
246 {
247 #if defined (__WXMSW__) || (EDB_LIBPQ)
248 wxLogInfo(wxString::Format(
249 _("using an enterprisedb callable statement (queryid:%ld, threadid:%ld)"),
250 (long)m_currIndex, (long)GetId()));
251 wxString stmt = wxString::Format(wxT("pgQueryThread-%ld-%ld"), this->GetId(), m_currIndex);
252 PGresult *res = PQiPrepareOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
253 queryBuf, pCount, pOids, pModes);
254
255 if( PQresultStatus(res) != PGRES_COMMAND_OK)
256 {
257 rc = pgQueryResultEvent::PGQ_ERROR_PREPARE_CALLABLE;
258 err.SetError(res, &conv);
259
260 PQclear(res);
261
262 goto return_with_error;
263 }
264
265 ret = PQiSendQueryPreparedOut(m_conn->conn, stmt.mb_str(wxConvUTF8),
266 pCount, pParams, pLens, pFormats, 1);
267
268 if (ret != 1)
269 {
270 rc = pgQueryResultEvent::PGQ_ERROR_EXECUTE_CALLABLE;
271
272 m_conn->SetLastResultError(NULL, _("Failed to run PQsendQuery in pgQueryThread"));
273 err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
274
275 PQclear(res);
276 res = NULL;
277
278 goto return_with_error;
279 }
280
281 PQclear(res);
282 res = NULL;
283 #else
284 rc = -1;
285 wxASSERT_MSG(false,
286 _("the program execution flow must not reach to this point in pgQueryThread"));
287
288 goto return_with_error;
289 #endif
290 }
291 else
292 {
293 // assumptions: we will need the results in text format only
294 ret = PQsendQueryParams(m_conn->conn, queryBuf, pCount, pOids, pParams, pLens, pFormats, 0);
295
296 if (ret != 1)
297 {
298 rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
299
300 m_conn->SetLastResultError(NULL,
301 _("Failed to run PQsendQueryParams in pgQueryThread"));
302
303 err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
304 wxString(PQerrorMessage(m_conn->conn), conv);
305
306 goto return_with_error;
307 }
308 }
309 goto continue_without_error;
310
311 return_with_error:
312 {
313 free(pOids);
314 free(pParams);
315 free(pLens);
316 free(pFormats);
317 #if defined (__WXMSW__) || (EDB_LIBPQ)
318 free(pModes);
319 #endif
320 return (RaiseEvent(rc));
321 }
322 }
323 else
324 {
325 // use the PQsendQuery api in case, we don't have any parameters to
326 // pass to the server
327 if (!PQsendQuery(m_conn->conn, queryBuf))
328 {
329 rc = pgQueryResultEvent::PGQ_ERROR_SEND_QUERY;
330
331 err.msg_primary = _("Failed to run PQsendQueryParams in pgQueryThread.\n") +
332 wxString(PQerrorMessage(m_conn->conn), conv);
333
334 return(RaiseEvent(rc));
335 }
336 }
337
338 continue_without_error:
339 int resultsRetrieved = 0;
340 PGresult *lastResult = 0;
341 bool connExecutionCancelled = false;
342
343 while (true)
344 {
345 // This is a 'joinable' thread, it is not advisable to call 'delete'
346 // function on this.
347 // Hence - it does not make sense to use the function 'testdestroy' here.
348 // We introduced the 'CancelExecution' function for the same purpose.
349 //
350 // We will have to call the CancelExecution function of pgConn to
351 // inform the backend that the user has cancelled the execution.
352 //
353 // We will need to consume all the results before quiting from the thread.
354 // Otherwise - the connection object will become unusable, and throw
355 // error - because libpq connections expects application to consume all
356 // the result, before executing another query
357 //
358 if (m_cancelled && rc != pgQueryResultEvent::PGQ_EXECUTION_CANCELLED)
359 {
360 // We shouldn't be calling cancellation multiple time
361 if (!connExecutionCancelled)
362 {
363 m_conn->CancelExecution();
364 connExecutionCancelled = true;
365 }
366 rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
367
368 err.msg_primary = _("Execution Cancelled");
369
370 if (lastResult)
371 {
372 PQclear(lastResult);
373 lastResult = NULL;
374 }
375 }
376
377 if ((rc = PQconsumeInput(m_conn->conn)) != 1)
378 {
379 if (m_cancelled)
380 {
381 rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
382 err.msg_primary = _("Execution Cancelled");
383
384 if (lastResult)
385 {
386 PQclear(lastResult);
387 lastResult = NULL;
388 }
389 // There is nothing more to consume.
390 // We can quit the thread now.
391 //
392 // Raise the event - if the component asked for it on
393 // execution cancellation.
394 if (m_eventOnCancellation)
395 RaiseEvent(rc);
396
397 return rc;
398 }
399
400 if (rc == 0)
401 {
402 err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
403 }
404 if (PQstatus(m_conn->conn) == CONNECTION_BAD)
405 {
406 err.msg_primary = _("Connection to the database server lost");
407 rc = pgQueryResultEvent::PGQ_CONN_LOST;
408 }
409 else
410 {
411 rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
412 }
413
414 return(RaiseEvent(rc));
415 }
416
417 if (PQisBusy(m_conn->conn))
418 {
419 Yield();
420 this->Sleep(10);
421
422 continue;
423 }
424
425 // if resultToRetrieve is given, the nth result will be returned,
426 // otherwise the last result set will be returned.
427 // all others are discarded
428 PGresult *res = PQgetResult(m_conn->conn);
429
430 if (!res)
431 break;
432
433 if((PQresultStatus(res) == PGRES_NONFATAL_ERROR) ||
434 (PQresultStatus(res) == PGRES_FATAL_ERROR) ||
435 (PQresultStatus(res) == PGRES_BAD_RESPONSE))
436 {
437 result = res;
438 err.SetError(res, &conv);
439
440 // Wait for the execution to be finished
441 // We need to fetch all the results, before sending the error
442 // message
443 do
444 {
445 if (PQconsumeInput(m_conn->conn) != 1)
446 {
447 if (m_cancelled)
448 {
449 rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
450
451 // Release the result as the query execution has been cancelled by the
452 // user
453 if (result)
454 PQclear(result);
455
456 result = NULL;
457
458 if (m_eventOnCancellation)
459 RaiseEvent(rc);
460
461 return rc;
462 }
463 goto out_of_consume_input_loop;
464 }
465
466 if ((res = PQgetResult(m_conn->conn)) == NULL)
467 {
468 goto out_of_consume_input_loop;
469 }
470
471 // Release the temporary results
472 PQclear(res);
473 res = NULL;
474
475 if (PQisBusy(m_conn->conn))
476 {
477 Yield();
478 this->Sleep(10);
479 }
480 }
481 while (true);
482
483 break;
484 }
485
486 #if defined (__WXMSW__) || (EDB_LIBPQ)
487 // there should be 2 results in the callable statement - the first is the
488 // dummy, the second contains our out params.
489 if (useCallable)
490 {
491 PQclear(res);
492 result = PQiGetOutResult(m_conn->conn);
493 }
494 #endif
495 if (PQresultStatus(res) == PGRES_COPY_IN)
496 {
497 rc = PGRES_COPY_IN;
498 PQputCopyEnd(m_conn->conn, "not supported by pgadmin");
499 }
500
501 if (PQresultStatus(res) == PGRES_COPY_OUT)
502 {
503 int copyRc;
504 char *buf;
505 int copyRows = 0;
506 int lastCopyRc = 0;
507
508 rc = PGRES_COPY_OUT;
509
510 AppendMessage(_("query returned copy data:\n"));
511
512 while((copyRc = PQgetCopyData(m_conn->conn, &buf, 1)) >= 0)
513 {
514 // Ignore processing the query result, when it has already been
515 // cancelled by the user
516 if (m_cancelled)
517 {
518 if (!connExecutionCancelled)
519 {
520 m_conn->CancelExecution();
521 connExecutionCancelled = true;
522 }
523 continue;
524 }
525
526 if (buf != NULL)
527 {
528 if (copyRows < 100)
529 {
530 wxString str(buf, conv);
531 wxCriticalSectionLocker cs(m_criticalSection);
532 m_queries[m_currIndex]->m_message.Append(str);
533
534 }
535 else if (copyRows == 100)
536 AppendMessage(_("Query returned more than 100 copy rows, discarding the rest...\n"));
537
538 PQfreemem(buf);
539 }
540 if (copyRc > 0)
541 copyRows++;
542
543 if (lastCopyRc == 0 && copyRc == 0)
544 {
545 Yield();
546 this->Sleep(10);
547 }
548 if (copyRc == 0)
549 {
550 if (!PQconsumeInput(m_conn->conn))
551 {
552 // It might be the case - it is a result of the
553 // execution cancellation.
554 if (m_cancelled)
555 {
556 rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
557
558 // Release the result as the query execution has been cancelled by the
559 // user
560 if (result)
561 PQclear(result);
562
563 result = NULL;
564
565 if (m_eventOnCancellation)
566 RaiseEvent(rc);
567
568 return rc;
569 }
570 if (PQstatus(m_conn->conn) == CONNECTION_BAD)
571 {
572 err.msg_primary = _("Connection to the database server lost");
573 rc = pgQueryResultEvent::PGQ_CONN_LOST;
574 }
575 else
576 {
577 rc = pgQueryResultEvent::PGQ_ERROR_CONSUME_INPUT;
578
579 err.msg_primary = wxString(PQerrorMessage(m_conn->conn), conv);
580 }
581 return(RaiseEvent(rc));
582 }
583 }
584 lastCopyRc = copyRc;
585 }
586
587 res = PQgetResult(m_conn->conn);
588
589 if (!res)
590 break;
591 }
592
593 resultsRetrieved++;
594
595 // Save the current result, as asked by the component
596 // But - only if the execution is not cancelled
597 if (!m_cancelled && resultsRetrieved == resultToRetrieve)
598 {
599 result = res;
600 insertedOid = PQoidValue(res);
601 if (insertedOid && insertedOid != (Oid) - 1)
602 AppendMessage(wxString::Format(_("query inserted one row with oid %d.\n"), insertedOid));
603 else
604 AppendMessage(wxString::Format(wxPLURAL("query result with %d row will be returned.\n", "query result with %d rows will be returned.\n",
605 PQntuples(result)), PQntuples(result)));
606 continue;
607 }
608
609 if (lastResult)
610 {
611 if (!m_cancelled && PQntuples(lastResult))
612 AppendMessage(wxString::Format(wxPLURAL("query result with %d row discarded.\n", "query result with %d rows discarded.\n",
613 PQntuples(lastResult)), PQntuples(lastResult)));
614 PQclear(lastResult);
615 }
616 lastResult = res;
617 }
618
619 out_of_consume_input_loop:
620 if (m_cancelled)
621 {
622 rc = pgQueryResultEvent::PGQ_EXECUTION_CANCELLED;
623
624 // Release the result as the query execution has been cancelled by the
625 // user
626 if (result)
627 PQclear(result);
628
629 result = NULL;
630
631 if (m_eventOnCancellation)
632 RaiseEvent(rc);
633
634 return rc;
635 }
636
637 if (!result)
638 result = lastResult;
639
640 err.SetError(result, &conv);
641
642 AppendMessage(wxT("\n"));
643
644 rc = PQresultStatus(result);
645 if (rc == PGRES_TUPLES_OK)
646 {
647 dataSet = new pgSet(result, m_conn, conv, m_conn->needColQuoting);
648 dataSet->MoveFirst();
649 }
650 else if (rc == PGRES_COMMAND_OK)
651 {
652 char *s = PQcmdTuples(result);
653 if (*s)
654 rowsInserted = atol(s);
655 }
656 else if (rc == PGRES_FATAL_ERROR ||
657 rc == PGRES_NONFATAL_ERROR ||
658 rc == PGRES_BAD_RESPONSE)
659 {
660 if (result)
661 {
662 AppendMessage(wxString(PQresultErrorMessage(result), conv));
663 PQclear(result);
664 result = NULL;
665 }
666 else
667 {
668 AppendMessage(wxString(PQerrorMessage(m_conn->conn), conv));
669 }
670
671 return(RaiseEvent(rc));
672 }
673
674 insertedOid = PQoidValue(result);
675 if (insertedOid == (Oid) - 1)
676 insertedOid = 0;
677
678 return(RaiseEvent(1));
679 }
680
RaiseEvent(int _retval)681 int pgQueryThread::RaiseEvent(int _retval)
682 {
683 #if !defined(PGSCLI)
684 if (m_caller)
685 {
686 pgQueryResultEvent resultEvent(GetId(), m_queries[m_currIndex], m_queries[m_currIndex]->m_eventID);
687
688 // client data
689 resultEvent.SetClientData(m_queries[m_currIndex]->m_data);
690 resultEvent.SetInt(_retval);
691
692 m_caller->AddPendingEvent(resultEvent);
693 }
694 #endif
695 return _retval;
696 }
697
698
Entry()699 void *pgQueryThread::Entry()
700 {
701 do
702 {
703 if (m_currIndex < (((int)m_queries.GetCount()) - 1))
704 {
705 // Create the PGcancel object to enable cancelling the running
706 // query
707 m_conn->SetConnCancel();
708 m_currIndex++;
709
710 m_queries[m_currIndex]->m_returnCode = -2;
711 m_queries[m_currIndex]->m_rowsInserted = -1l;
712
713 wxLogSql(wxT("Thread executing query (%d:%s:%d): %s"),
714 m_currIndex + 1, m_conn->GetHost().c_str(), m_conn->GetPort(),
715 m_queries[m_currIndex]->m_query.c_str());
716
717 // register the notice processor for the current query
718 m_conn->RegisterNoticeProcessor(m_processor, m_noticeHandler);
719
720 // execute the current query now
721 Execute();
722
723 // remove the notice processor now
724 m_conn->RegisterNoticeProcessor(0, 0);
725
726 // reset the PGcancel object
727 m_conn->ResetConnCancel();
728 }
729
730 if (!m_multiQueries || m_cancelled)
731 break;
732
733 wxThread::Sleep(10);
734 }
735 while (true);
736
737 return(NULL);
738 }
739
DeleteReleasedQueries()740 int pgQueryThread::DeleteReleasedQueries()
741 {
742 int res = 0,
743 idx = 0;
744
745 if (m_queriesLock.TryLock() == wxMUTEX_BUSY)
746 return res;
747
748 for (; idx <= m_currIndex; idx++)
749 {
750 if (m_queries[idx]->m_resultSet != NULL)
751 {
752 pgSet *set = m_queries[idx]->m_resultSet;
753 m_queries[idx]->m_resultSet = NULL;
754 delete set;
755 set = NULL;
756
757 res++;
758 }
759 }
760 m_queriesLock.Unlock();
761
762 return res;
763 }
764
765
GetResultError(int idx)766 pgError pgQueryThread::GetResultError(int idx)
767 {
768 wxMutexLocker lock(m_queriesLock);
769
770 if (idx == -1)
771 idx = m_currIndex;
772
773 return m_queries[idx]->m_err;
774 }
775
776
GetErrorMessage()777 const wxString &pgBatchQuery::GetErrorMessage()
778 {
779 return m_err.msg_primary;
780 }
781
~pgBatchQuery()782 pgBatchQuery::~pgBatchQuery()
783 {
784 if (m_resultSet)
785 {
786 delete m_resultSet;
787 m_resultSet = NULL;
788 }
789
790 if (m_params)
791 {
792 WX_CLEAR_ARRAY((*m_params));
793 delete m_params;
794 m_params = NULL;
795 }
796 }
797
Release()798 bool pgBatchQuery::Release()
799 {
800 bool res = false;
801
802 if (m_resultSet != NULL)
803 {
804 res = true;
805
806 pgSet *set = m_resultSet;
807 m_resultSet = NULL;
808
809 if (set)
810 delete set;
811 set = NULL;
812 }
813
814 if (m_params)
815 {
816 res = true;
817
818 WX_CLEAR_ARRAY((*m_params));
819 delete m_params;
820 m_params = NULL;
821 }
822
823 return res;
824 }
825
pgQueryResultEvent(unsigned long _thrdId,pgBatchQuery * _qry,int _id)826 pgQueryResultEvent::pgQueryResultEvent(
827 unsigned long _thrdId, pgBatchQuery *_qry, int _id) :
828 wxCommandEvent(PGQueryResultEvent, _id), m_thrdId(_thrdId),
829 m_query(_qry) { }
830
pgQueryResultEvent(const pgQueryResultEvent & _ev)831 pgQueryResultEvent::pgQueryResultEvent(const pgQueryResultEvent &_ev)
832 : wxCommandEvent(_ev), m_thrdId(_ev.m_thrdId), m_query(_ev.m_query) { }
833
834
pgParam(Oid _type,void * _val,int _len,short _mode)835 pgParam::pgParam(Oid _type, void *_val, int _len, short _mode)
836 : m_type(_type), m_val(_val), m_len(_len), m_mode(_mode)
837 {
838 switch(m_type)
839 {
840 case PGOID_TYPE_CHAR:
841 case PGOID_TYPE_NAME:
842 case PGOID_TYPE_TEXT:
843 case PGOID_TYPE_VARCHAR:
844 case PGOID_TYPE_CSTRING:
845 m_format = 0;
846 default:
847 m_format = 1;
848 }
849 }
850
851 // wxString data
pgParam(Oid _oid,wxString * _val,wxMBConv * _conv,short _mode)852 pgParam::pgParam(Oid _oid, wxString *_val, wxMBConv *_conv, short _mode)
853 : m_mode(_mode)
854 {
855 if (m_mode == PG_PARAM_OUT || !_val)
856 {
857 m_len = 0;
858 }
859 else
860 {
861 m_len = _val->Len();
862 }
863 if (_val)
864 {
865 char *str = (char *)malloc(m_len + 1);
866 if (!_val->IsEmpty() && _mode != PG_PARAM_OUT)
867 {
868 strncpy(str,
869 (const char *)_val->mb_str(
870 *(_conv != NULL ? _conv : &wxConvLocal)), m_len);
871 str[m_len] = '\0';
872 }
873 else
874 {
875 str[0] = '\0';
876 }
877 m_val = (void *)(str);
878 }
879 else
880 {
881 m_val = NULL;
882 }
883 m_type = _oid;
884
885 // text format
886 m_format = 0;
887 }
888
889
~pgParam()890 pgParam::~pgParam()
891 {
892 if (m_val)
893 free(m_val);
894 m_val = NULL;
895 }
896
897
GetFormat()898 int pgParam::GetFormat()
899 {
900 return m_format;
901 }
902