1 /************ TabPivot C++ Program Source Code File (.CPP) *************/
2 /* PROGRAM NAME: TABPIVOT */
3 /* ------------- */
4 /* Version 1.7 */
5 /* */
6 /* COPYRIGHT: */
7 /* ---------- */
8 /* (C) Copyright to the author Olivier BERTRAND 2005-2017 */
9 /* */
10 /* WHAT THIS PROGRAM DOES: */
11 /* ----------------------- */
12 /* This program are the PIVOT classes DB execution routines. */
13 /***********************************************************************/
14
15 /***********************************************************************/
16 /* Include relevant sections of the operating system header file. */
17 /***********************************************************************/
18 #include "my_global.h"
19 #include "table.h" // MySQL table definitions
20 #if defined(_WIN32)
21 #if defined(__BORLANDC__)
22 #define __MFC_COMPAT__ // To define min/max as macro
23 #endif
24 //#include <windows.h>
25 #elif defined(UNIX)
26 #include <errno.h>
27 #include <unistd.h>
28 #include "osutil.h"
29 #else
30 #include <io.h>
31 #endif
32
33 /***********************************************************************/
34 /* Include application header files: */
35 /* global.h is header containing all global declarations. */
36 /* plgdbsem.h is header containing the DB application declarations. */
37 /***********************************************************************/
38 #define FRM_VER 6
39 #include "sql_const.h"
40 #include "field.h"
41 #include "global.h"
42 #include "plgdbsem.h"
43 #include "xtable.h"
44 #include "tabext.h"
45 #include "tabcol.h"
46 #include "colblk.h"
47 #include "tabmysql.h"
48 #include "csort.h"
49 #include "tabutil.h"
50 #include "tabpivot.h"
51 #include "valblk.h"
52 #include "ha_connect.h"
53
54 /***********************************************************************/
55 /* Make the Pivot table column list. */
56 /***********************************************************************/
PivotColumns(PGLOBAL g,const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)57 PQRYRES PivotColumns(PGLOBAL g, const char *tab, const char *src,
58 const char *picol, const char *fncol,
59 const char *skcol, const char *host,
60 const char *db, const char *user,
61 const char *pwd, int port)
62 {
63 PIVAID pvd(tab, src, picol, fncol, skcol, host, db, user, pwd, port);
64
65 return pvd.MakePivotColumns(g);
66 } // end of PivotColumns
67
68 /* --------------- Implementation of the PIVAID classe --------------- */
69
70 /***********************************************************************/
71 /* PIVAID constructor. */
72 /***********************************************************************/
PIVAID(const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)73 PIVAID::PIVAID(const char *tab, const char *src, const char *picol,
74 const char *fncol, const char *skcol, const char *host,
75 const char *db, const char *user, const char *pwd,
76 int port) : CSORT(false)
77 {
78 Host = (char*)host;
79 User = (char*)user;
80 Pwd = (char*)pwd;
81 Qryp = NULL;
82 Database = (char*)db;
83 Tabname = (char*)tab;
84 Tabsrc = (char*)src;
85 Picol = (char*)picol;
86 Fncol = (char*)fncol;
87 Skcol = (char*)skcol;
88 Rblkp = NULL;
89 Port = (port) ? port : GetDefaultPort();
90 } // end of PIVAID constructor
91
92 /***********************************************************************/
93 /* Skip columns that are in the skipped column list. */
94 /***********************************************************************/
SkipColumn(PCOLRES crp,char * skc)95 bool PIVAID::SkipColumn(PCOLRES crp, char *skc)
96 {
97 if (skc)
98 for (char *p = skc; *p; p += (strlen(p) + 1))
99 if (!stricmp(crp->Name, p))
100 return true;
101
102 return false;
103 } // end of SkipColumn
104
105 /***********************************************************************/
106 /* Make the Pivot table column list. */
107 /***********************************************************************/
MakePivotColumns(PGLOBAL g)108 PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
109 {
110 char *p, *query, *colname, *skc, buf[64];
111 int ndif, nblin, w = 0;
112 bool b = false;
113 PVAL valp;
114 PQRYRES qrp;
115 PCOLRES *pcrp, crp, fncrp = NULL;
116
117 try {
118 // Are there columns to skip?
119 if (Skcol) {
120 uint n = strlen(Skcol);
121
122 skc = (char*)PlugSubAlloc(g, NULL, n + 2);
123 strcpy(skc, Skcol);
124 skc[n + 1] = 0;
125
126 // Replace ; by nulls in skc
127 for (p = strchr(skc, ';'); p; p = strchr(p, ';'))
128 *p++ = 0;
129
130 } else
131 skc = NULL;
132
133 if (!Tabsrc && Tabname) {
134 // Locate the query
135 query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 26);
136 sprintf(query, "SELECT * FROM `%s` LIMIT 1", Tabname);
137 } else if (!Tabsrc) {
138 strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
139 goto err;
140 } else
141 query = (char*)Tabsrc;
142
143 // Open a MySQL connection for this table
144 if (!Myc.Open(g, Host, Database, User, Pwd, Port)) {
145 b = true;
146
147 // Returned values must be in their original character set
148 if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
149 goto err;
150 else
151 Myc.FreeResult();
152
153 } else
154 goto err;
155
156 // Send the source command to MySQL
157 if (Myc.ExecSQL(g, query, &w) == RC_FX)
158 goto err;
159
160 // We must have a storage query to get pivot column values
161 if (!(Qryp = Myc.GetResult(g, true)))
162 goto err;
163
164 if (!Fncol) {
165 for (crp = Qryp->Colresp; crp; crp = crp->Next)
166 if ((!Picol || stricmp(Picol, crp->Name)) && !SkipColumn(crp, skc))
167 Fncol = crp->Name;
168
169 if (!Fncol) {
170 strcpy(g->Message, MSG(NO_DEF_FNCCOL));
171 goto err;
172 } // endif Fncol
173
174 } // endif Fncol
175
176 if (!Picol) {
177 // Find default Picol as the last one not equal to Fncol
178 for (crp = Qryp->Colresp; crp; crp = crp->Next)
179 if (stricmp(Fncol, crp->Name) && !SkipColumn(crp, skc))
180 Picol = crp->Name;
181
182 if (!Picol) {
183 strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
184 goto err;
185 } // endif Picol
186
187 } // endif picol
188
189 // Prepare the column list
190 for (pcrp = &Qryp->Colresp; (crp = *pcrp); )
191 if (SkipColumn(crp, skc)) {
192 // Ignore this column
193 *pcrp = crp->Next;
194 } else if (!stricmp(Picol, crp->Name)) {
195 if (crp->Nulls) {
196 sprintf(g->Message, "Pivot column %s cannot be nullable", Picol);
197 goto err;
198 } // endif Nulls
199
200 Rblkp = crp->Kdata;
201 *pcrp = crp->Next;
202 } else if (!stricmp(Fncol, crp->Name)) {
203 fncrp = crp;
204 *pcrp = crp->Next;
205 } else
206 pcrp = &crp->Next;
207
208 if (!Rblkp) {
209 strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
210 goto err;
211 } else if (!fncrp) {
212 strcpy(g->Message, MSG(NO_DEF_FNCCOL));
213 goto err;
214 } // endif
215
216 if (Tabsrc) {
217 Myc.Close();
218 b = false;
219
220 // Before calling sort, initialize all
221 nblin = Qryp->Nblin;
222
223 Index.Size = nblin * sizeof(int);
224 Index.Sub = TRUE; // Should be small enough
225
226 if (!PlgDBalloc(g, NULL, Index))
227 goto err;
228
229 Offset.Size = (nblin + 1) * sizeof(int);
230 Offset.Sub = TRUE; // Should be small enough
231
232 if (!PlgDBalloc(g, NULL, Offset))
233 goto err;
234
235 ndif = Qsort(g, nblin);
236
237 if (ndif < 0) // error
238 goto err;
239
240 } else {
241 // The query was limited, we must get pivot column values
242 // Returned values must be in their original character set
243 // if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
244 // goto err;
245
246 query = (char*)PlugSubAlloc(g, NULL, 0);
247 sprintf(query, "SELECT DISTINCT `%s` FROM `%s`", Picol, Tabname);
248 PlugSubAlloc(g, NULL, strlen(query) + 1);
249 Myc.FreeResult();
250
251 // Send the source command to MySQL
252 if (Myc.ExecSQL(g, query, &w) == RC_FX)
253 goto err;
254
255 // We must have a storage query to get pivot column values
256 if (!(qrp = Myc.GetResult(g, true)))
257 goto err;
258
259 Myc.Close();
260 b = false;
261
262 // Get the column list
263 crp = qrp->Colresp;
264 Rblkp = crp->Kdata;
265 ndif = qrp->Nblin;
266 } // endif Tabsrc
267
268 // Allocate the Value used to retieve column names
269 if (!(valp = AllocateValue(g, Rblkp->GetType(),
270 Rblkp->GetVlen(),
271 Rblkp->GetPrec())))
272 goto err;
273
274 // Now make the functional columns
275 for (int i = 0; i < ndif; i++) {
276 if (i) {
277 crp = (PCOLRES)PlugSubAlloc(g, NULL, sizeof(COLRES));
278 memcpy(crp, fncrp, sizeof(COLRES));
279 } else
280 crp = fncrp;
281
282 // Get the value that will be the generated column name
283 if (Tabsrc)
284 valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
285 else
286 valp->SetValue_pvblk(Rblkp, i);
287
288 colname = valp->GetCharString(buf);
289 crp->Name = PlugDup(g, colname);
290 crp->Flag = 1;
291
292 // Add this column
293 *pcrp = crp;
294 crp->Next = NULL;
295 pcrp = &crp->Next;
296 } // endfor i
297
298 // We added ndif columns and removed 2 (picol and fncol)
299 Qryp->Nbcol += (ndif - 2);
300 return Qryp;
301 } catch (int n) {
302 if (trace(1))
303 htrc("Exception %d: %s\n", n, g->Message);
304 } catch (const char *msg) {
305 strcpy(g->Message, msg);
306 } // end catch
307
308 err:
309 if (b)
310 Myc.Close();
311
312 return NULL;
313 } // end of MakePivotColumns
314
315 /***********************************************************************/
316 /* PIVAID: Compare routine for sorting pivot column values. */
317 /***********************************************************************/
Qcompare(int * i1,int * i2)318 int PIVAID::Qcompare(int *i1, int *i2)
319 {
320 // TODO: the actual comparison between pivot column result values.
321 return Rblkp->CompVal(*i1, *i2);
322 } // end of Qcompare
323
324 /* --------------- Implementation of the PIVOT classes --------------- */
325
326 /***********************************************************************/
327 /* PIVOTDEF constructor. */
328 /***********************************************************************/
PIVOTDEF(void)329 PIVOTDEF::PIVOTDEF(void)
330 {
331 Host = User = Pwd = DB = NULL;
332 Tabname = Tabsrc = Picol = Fncol = Function = NULL;
333 GBdone = Accept = false;
334 Port = 0;
335 } // end of PIVOTDEF constructor
336
337 /***********************************************************************/
338 /* DefineAM: define specific AM block values from PIVOT table. */
339 /***********************************************************************/
DefineAM(PGLOBAL g,LPCSTR am,int poff)340 bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
341 {
342 char *p1, *p2;
343 PHC hc __attribute__((unused))= ((MYCAT*)Cat)->GetHandler();
344
345 if (PRXDEF::DefineAM(g, am, poff))
346 return TRUE;
347
348 Tabname = (char*)Tablep->GetName();
349 DB = (char*)Tablep->GetSchema();
350 Tabsrc = (char*)Tablep->GetSrc();
351
352 Host = GetStringCatInfo(g, "Host", "localhost");
353 User = GetStringCatInfo(g, "User", "*");
354 Pwd = GetStringCatInfo(g, "Password", NULL);
355 Picol = GetStringCatInfo(g, "PivotCol", NULL);
356 Fncol = GetStringCatInfo(g, "FncCol", NULL);
357
358 // If fncol is like avg(colname), separate Fncol and Function
359 if (Fncol && (p1 = strchr(Fncol, '(')) && (p2 = strchr(p1, ')')) &&
360 (*Fncol != '"') && (!*(p2+1))) {
361 *p1++ = '\0'; *p2 = '\0';
362 Function = Fncol;
363 Fncol = p1;
364 } else
365 Function = GetStringCatInfo(g, "Function", "SUM");
366
367 GBdone = GetBoolCatInfo("Groupby", false);
368 Accept = GetBoolCatInfo("Accept", false);
369 Port = GetIntCatInfo("Port", 3306);
370 Desc = (Tabsrc) ? Tabsrc : Tabname;
371 return FALSE;
372 } // end of DefineAM
373
374 /***********************************************************************/
375 /* GetTable: makes a new TDB of the proper type. */
376 /***********************************************************************/
GetTable(PGLOBAL g,MODE)377 PTDB PIVOTDEF::GetTable(PGLOBAL g, MODE)
378 {
379 return new(g) TDBPIVOT(this);
380 } // end of GetTable
381
382 /* ------------------------------------------------------------------- */
383
384 /***********************************************************************/
385 /* Implementation of the TDBPIVOT class. */
386 /***********************************************************************/
TDBPIVOT(PPIVOTDEF tdp)387 TDBPIVOT::TDBPIVOT(PPIVOTDEF tdp) : TDBPRX(tdp)
388 {
389 Host = tdp->Host;
390 Database = tdp->DB;
391 User = tdp->User;
392 Pwd = tdp->Pwd;
393 Port = tdp->Port;
394 Tabname = tdp->Tabname; // Name of source table
395 Tabsrc = tdp->Tabsrc; // SQL description of source table
396 Picol = tdp->Picol; // Pivot column name
397 Fncol = tdp->Fncol; // Function column name
398 Function = tdp->Function; // Aggregate function name
399 Xcolp = NULL; // To the FNCCOL column
400 //Xresp = NULL; // To the pivot result column
401 //Rblkp = NULL; // The value block of the pivot column
402 Fcolp = NULL; // To the function column
403 Dcolp = NULL; // To the dump column
404 GBdone = tdp->GBdone;
405 Accept = tdp->Accept;
406 Mult = -1; // Estimated table size
407 N = 0; // The current table index
408 M = 0; // The occurence rank
409 FileStatus = 0; // Logical End-of-File
410 RowFlag = 0; // 0: Ok, 1: Same, 2: Skip
411 } // end of TDBPIVOT constructor
412
413 /***********************************************************************/
414 /* Allocate source column description block. */
415 /***********************************************************************/
MakeCol(PGLOBAL g,PCOLDEF cdp,PCOL cprec,int n)416 PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
417 {
418 PCOL colp;
419
420 if (cdp->GetOffset()) {
421 colp = new(g) FNCCOL(cdp, this, cprec, n);
422
423 if (cdp->GetOffset() > 1)
424 Dcolp = colp;
425
426 } else
427 colp = new(g) SRCCOL(cdp, this, cprec, n);
428
429 return colp;
430 } // end of MakeCol
431
432 /***********************************************************************/
433 /* Find default fonction and pivot columns. */
434 /***********************************************************************/
FindDefaultColumns(PGLOBAL g)435 bool TDBPIVOT::FindDefaultColumns(PGLOBAL g)
436 {
437 PCOLDEF cdp;
438 PTABDEF defp = Tdbp->GetDef();
439
440 if (!Fncol) {
441 for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
442 if (!Picol || stricmp(Picol, cdp->GetName()))
443 Fncol = cdp->GetName();
444
445 if (!Fncol) {
446 strcpy(g->Message, MSG(NO_DEF_FNCCOL));
447 return true;
448 } // endif Fncol
449
450 } // endif Fncol
451
452 if (!Picol) {
453 // Find default Picol as the last one not equal to Fncol
454 for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
455 if (stricmp(Fncol, cdp->GetName()))
456 Picol = cdp->GetName();
457
458 if (!Picol) {
459 strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
460 return true;
461 } // endif Picol
462
463 } // endif Picol
464
465 return false;
466 } // end of FindDefaultColumns
467
468 /***********************************************************************/
469 /* Prepare the source table Query. */
470 /***********************************************************************/
GetSourceTable(PGLOBAL g)471 bool TDBPIVOT::GetSourceTable(PGLOBAL g)
472 {
473 if (Tdbp)
474 return false; // Already done
475
476 if (!Tabsrc && Tabname) {
477 // Get the table description block of this table
478 if (!(Tdbp = GetSubTable(g, ((PPIVOTDEF)To_Def)->Tablep, true)))
479 return true;
480
481 if (!GBdone) {
482 char *colist;
483 PCOLDEF cdp;
484
485 if (FindDefaultColumns(g))
486 return true;
487
488 // Locate the suballocated colist (size is not known yet)
489 *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0;
490
491 // Make the column list
492 for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext())
493 if (!cdp->GetOffset())
494 strcat(strcat(colist, cdp->GetName()), ", ");
495
496 // Add the Pivot column at the end of the list
497 strcat(colist, Picol);
498
499 // Now we know how much was suballocated
500 PlugSubAlloc(g, NULL, strlen(colist) + 1);
501
502 // Locate the source string (size is not known yet)
503 Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
504
505 // Start making the definition
506 strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", ");
507
508 // Make it suitable for Pivot by doing the group by
509 strcat(strcat(Tabsrc, Function), "(");
510 strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol);
511 strcat(strcat(Tabsrc, " FROM "), Tabname);
512 strcat(strcat(Tabsrc, " GROUP BY "), colist);
513
514 if (Tdbp->IsView()) // Until MariaDB bug is fixed
515 strcat(strcat(Tabsrc, " ORDER BY "), colist);
516
517 // Now we know how much was suballocated
518 PlugSubAlloc(g, NULL, strlen(Tabsrc) + 1);
519 } // endif !GBdone
520
521 } else if (!Tabsrc) {
522 strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
523 return true;
524 } // endif
525
526 if (Tabsrc) {
527 // Get the new table description block of this source table
528 PTABLE tablep = new(g) XTAB("whatever", Tabsrc);
529
530 tablep->SetSchema(Database);
531
532 if (!(Tdbp = GetSubTable(g, tablep, true)))
533 return true;
534
535 } // endif Tabsrc
536
537 return false;
538 } // end of GetSourceTable
539
540 /***********************************************************************/
541 /* Make the required pivot columns. */
542 /***********************************************************************/
MakePivotColumns(PGLOBAL g)543 bool TDBPIVOT::MakePivotColumns(PGLOBAL g)
544 {
545 if (!Tdbp->IsView()) {
546 // This was not done yet if GBdone is true
547 if (FindDefaultColumns(g))
548 return true;
549
550 // Now it is time to allocate the pivot and function columns
551 if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) {
552 // Function column not found in table
553 sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
554 return true;
555 } else if (Fcolp->InitValue(g))
556 return true;
557
558 if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) {
559 // Pivot column not found in table
560 sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
561 return true;
562 } else if (Xcolp->InitValue(g))
563 return true;
564
565 // Check and initialize the subtable columns
566 for (PCOL cp = Columns; cp; cp = cp->GetNext())
567 if (cp->GetAmType() == TYPE_AM_SRC) {
568 if (((PSRCCOL)cp)->Init(g, NULL))
569 return TRUE;
570
571 } else if (cp->GetAmType() == TYPE_AM_FNC)
572 if (((PFNCCOL)cp)->InitColumn(g))
573 return TRUE;
574
575 } // endif isview
576
577 return false;
578 } // end of MakePivotColumns
579
580 /***********************************************************************/
581 /* Make the required pivot columns for an object view. */
582 /***********************************************************************/
MakeViewColumns(PGLOBAL g)583 bool TDBPIVOT::MakeViewColumns(PGLOBAL g)
584 {
585 if (Tdbp->IsView()) {
586 // Tdbp is a view ColDB cannot be used
587 PCOL colp, cp;
588 PTDBMY tdbp;
589
590 if (Tdbp->GetAmType() != TYPE_AM_MYSQL) {
591 strcpy(g->Message, "View is not MySQL");
592 return true;
593 } else
594 tdbp = (PTDBMY)Tdbp;
595
596 if (!Fncol && !(Fncol = tdbp->FindFieldColumn(Picol))) {
597 strcpy(g->Message, MSG(NO_DEF_FNCCOL));
598 return true;
599 } // endif Fncol
600
601 if (!Picol && !(Picol = tdbp->FindFieldColumn(Fncol))) {
602 strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
603 return true;
604 } // endif Picol
605
606 // Now it is time to allocate the pivot and function columns
607 if (!(Fcolp = tdbp->MakeFieldColumn(g, Fncol)))
608 return true;
609
610 if (!(Xcolp = tdbp->MakeFieldColumn(g, Picol)))
611 return true;
612
613 // Check and initialize the subtable columns
614 for (cp = Columns; cp; cp = cp->GetNext())
615 if (cp->GetAmType() == TYPE_AM_SRC) {
616 if ((colp = tdbp->MakeFieldColumn(g, cp->GetName()))) {
617 ((PSRCCOL)cp)->Colp = colp;
618 ((PSRCCOL)cp)->To_Val = colp->GetValue();
619 cp->AddStatus(BUF_READ); // All is done here
620 } else
621 return true;
622
623 } else if (cp->GetAmType() == TYPE_AM_FNC)
624 if (((PFNCCOL)cp)->InitColumn(g))
625 return TRUE;
626
627 } // endif isview
628
629 return false;
630 } // end of MakeViewColumns
631
632 /***********************************************************************/
633 /* PIVOT GetMaxSize: returns the maximum number of rows in the table. */
634 /***********************************************************************/
GetMaxSize(PGLOBAL g)635 int TDBPIVOT::GetMaxSize(PGLOBAL g __attribute__((unused)))
636 {
637 #if 0
638 if (MaxSize < 0)
639 MaxSize = MakePivotColumns(g);
640
641 return MaxSize;
642 #endif // 0
643 return 10;
644 } // end of GetMaxSize
645
646 /***********************************************************************/
647 /* In this sample, ROWID will be the (virtual) row number, */
648 /* while ROWNUM will be the occurence rank in the multiple column. */
649 /***********************************************************************/
RowNumber(PGLOBAL,bool b)650 int TDBPIVOT::RowNumber(PGLOBAL, bool b)
651 {
652 return (b) ? M : N;
653 } // end of RowNumber
654
655 /***********************************************************************/
656 /* PIVOT Access Method opening routine. */
657 /***********************************************************************/
OpenDB(PGLOBAL g)658 bool TDBPIVOT::OpenDB(PGLOBAL g)
659 {
660 if (Use == USE_OPEN) {
661 /*******************************************************************/
662 /* Table already open, just replace it at its beginning. */
663 /*******************************************************************/
664 N = M = 0;
665 RowFlag = 0;
666 FileStatus = 0;
667 return FALSE;
668 } // endif use
669
670 if (Mode != MODE_READ) {
671 /*******************************************************************/
672 /* Currently PIVOT tables cannot be modified. */
673 /*******************************************************************/
674 sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
675 return TRUE;
676 } // endif Mode
677
678 if (To_Key_Col || To_Kindex) {
679 /*******************************************************************/
680 /* Direct access of PIVOT tables is not implemented yet. */
681 /*******************************************************************/
682 strcpy(g->Message, MSG(NO_PIV_DIR_ACC));
683 return TRUE;
684 } // endif To_Key_Col
685
686 /*********************************************************************/
687 /* Do it here if not done yet (should not be the case). */
688 /*********************************************************************/
689 if (GetSourceTable(g))
690 return TRUE;
691
692 // For tables, columns must be allocated before opening
693 if (MakePivotColumns(g))
694 return TRUE;
695
696 /*********************************************************************/
697 /* Physically open the object table. */
698 /*********************************************************************/
699 if (Tdbp->OpenDB(g))
700 return TRUE;
701
702 Use = USE_OPEN; // Do it now in case we are recursively called
703
704 /*********************************************************************/
705 /* Make all required pivot columns for object views. */
706 /*********************************************************************/
707 return MakeViewColumns(g);
708 } // end of OpenDB
709
710 /***********************************************************************/
711 /* Data Base read routine for PIVOT access method. */
712 /***********************************************************************/
ReadDB(PGLOBAL g)713 int TDBPIVOT::ReadDB(PGLOBAL g)
714 {
715 int rc = RC_OK;
716 bool newrow = FALSE;
717 PCOL colp;
718
719 if (FileStatus == 2)
720 return RC_EF;
721
722 if (FileStatus)
723 for (colp = Columns; colp; colp = colp->GetNext())
724 if (colp->GetAmType() == TYPE_AM_SRC)
725 ((PSRCCOL)colp)->SetColumn();
726
727 // New row, reset all function column values
728 for (colp = Columns; colp; colp = colp->GetNext())
729 if (colp->GetAmType() == TYPE_AM_FNC)
730 colp->GetValue()->Reset();
731
732 /*********************************************************************/
733 /* Now start the multi reading process. */
734 /*********************************************************************/
735 do {
736 if (RowFlag != 1) {
737 if ((rc = Tdbp->ReadDB(g)) != RC_OK) {
738 if (FileStatus && rc == RC_EF) {
739 // A prepared row remains to be sent
740 FileStatus = 2;
741 rc = RC_OK;
742 } // endif FileStatus
743
744 break;
745 } // endif rc
746
747 for (colp = Tdbp->GetColumns(); colp; colp = colp->GetNext())
748 colp->ReadColumn(g);
749
750 for (colp = Columns; colp; colp = colp->GetNext())
751 {
752 if (colp->GetAmType() == TYPE_AM_SRC)
753 {
754 if (FileStatus) {
755 if (((PSRCCOL)colp)->CompareLast()) {
756 newrow = (RowFlag) ? TRUE : FALSE;
757 break;
758 } // endif CompareLast
759
760 } else
761 ((PSRCCOL)colp)->SetColumn();
762 }
763 }
764 FileStatus = 1;
765 } // endif RowFlag
766
767 if (newrow) {
768 RowFlag = 1;
769 break;
770 } else
771 RowFlag = 2;
772
773 // Look for the column having this header
774 for (colp = Columns; colp; colp = colp->GetNext())
775 if (colp->GetAmType() == TYPE_AM_FNC) {
776 if (((PFNCCOL)colp)->CompareColumn())
777 break;
778
779 } // endif AmType
780
781 if (!colp && !(colp = Dcolp)) {
782 if (!Accept) {
783 strcpy(g->Message, MSG(NO_MATCH_COL));
784 return RC_FX;
785 } else
786 continue;
787
788 } // endif colp
789
790 // Set the value of the matching column from the fonction value
791 colp->GetValue()->SetValue_pval(Fcolp->GetValue());
792 } while (RowFlag == 2);
793
794 N++;
795 return rc;
796 } // end of ReadDB
797
798 /***********************************************************************/
799 /* WriteDB: Data Base write routine for PIVOT access methods. */
800 /***********************************************************************/
WriteDB(PGLOBAL g)801 int TDBPIVOT::WriteDB(PGLOBAL g)
802 {
803 sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
804 return RC_FX;
805 } // end of WriteDB
806
807 /***********************************************************************/
808 /* Data Base delete line routine for PIVOT access methods. */
809 /***********************************************************************/
DeleteDB(PGLOBAL g,int)810 int TDBPIVOT::DeleteDB(PGLOBAL g, int)
811 {
812 sprintf(g->Message, MSG(NO_TABLE_DEL), "PIVOT");
813 return RC_FX;
814 } // end of DeleteDB
815
816 /***********************************************************************/
817 /* Data Base close routine for PIVOT access method. */
818 /***********************************************************************/
CloseDB(PGLOBAL g)819 void TDBPIVOT::CloseDB(PGLOBAL g)
820 {
821 if (Tdbp)
822 Tdbp->CloseDB(g);
823
824 } // end of CloseDB
825
826 // ------------------------ FNCCOL functions ----------------------------
827
828 /***********************************************************************/
829 /* FNCCOL public constructor. */
830 /***********************************************************************/
FNCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int i)831 FNCCOL::FNCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i)
832 : COLBLK(cdp, tdbp, i)
833 {
834 if (cprec) {
835 Next = cprec->GetNext();
836 cprec->SetNext(this);
837 } else {
838 Next = tdbp->GetColumns();
839 tdbp->SetColumns(this);
840 } // endif cprec
841
842 Value = NULL; // We'll get a new one later
843 Hval = NULL; // The unconverted header value
844 Xcolp = NULL;
845 } // end of FNCCOL constructor
846
847 /***********************************************************************/
848 /* FNCCOL initialization function. */
849 /***********************************************************************/
InitColumn(PGLOBAL g)850 bool FNCCOL::InitColumn(PGLOBAL g)
851 {
852 // Must have its own value block
853 if (InitValue(g))
854 return TRUE;
855
856 // Make a value from the column name
857 Hval = AllocateValue(g, Name, TYPE_STRING);
858 Hval->SetPrec(1); // Case insensitive
859
860 Xcolp = ((PTDBPIVOT)To_Tdb)->Xcolp;
861 AddStatus(BUF_READ); // All is done here
862 return FALSE;
863 } // end of InitColumn
864
865 /***********************************************************************/
866 /* CompareColumn: Compare column value with source column value. */
867 /***********************************************************************/
CompareColumn(void)868 bool FNCCOL::CompareColumn(void)
869 {
870 // Compare the unconverted values
871 return Hval->IsEqual(Xcolp->GetValue(), false);
872 } // end of CompareColumn
873
874 // ------------------------ SRCCOL functions ----------------------------
875
876 /***********************************************************************/
877 /* SRCCOL public constructor. */
878 /***********************************************************************/
SRCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int n)879 SRCCOL::SRCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int n)
880 : PRXCOL(cdp, tdbp, cprec, n)
881 {
882 } // end of SRCCOL constructor
883
884 /***********************************************************************/
885 /* Initialize the column as pointing to the source column. */
886 /***********************************************************************/
Init(PGLOBAL g,PTDB tp)887 bool SRCCOL::Init(PGLOBAL g, PTDB tp)
888 {
889 if (PRXCOL::Init(g, tp))
890 return true;
891
892 AddStatus(BUF_READ); // All is done here
893 return false;
894 } // end of SRCCOL constructor
895
896 /***********************************************************************/
897 /* SetColumn: have the column value set from the source column. */
898 /***********************************************************************/
SetColumn(void)899 void SRCCOL::SetColumn(void)
900 {
901 Value->SetValue_pval(To_Val);
902 } // end of SetColumn
903
904 /***********************************************************************/
905 /* SetColumn: Compare column value with source column value. */
906 /***********************************************************************/
CompareLast(void)907 bool SRCCOL::CompareLast(void)
908 {
909 // Compare the unconverted values
910 return !Value->IsEqual(To_Val, true);
911 } // end of CompareColumn
912
913 /* --------------------- End of TabPivot/TabQrs ---------------------- */
914