1 /************ TabPivot C++ Program Source Code File (.CPP) *************/
2 /* PROGRAM NAME: TABPIVOT                                              */
3 /* -------------                                                       */
4 /*  Version 1.7                                                        */
5 /*                                                                     */
6 /* COPYRIGHT:                                                          */
7 /* ----------                                                          */
8 /*  (C) Copyright to the author Olivier BERTRAND          2005-2017    */
9 /*                                                                     */
10 /* WHAT THIS PROGRAM DOES:                                             */
11 /* -----------------------                                             */
12 /*  This program are the PIVOT classes DB execution routines.          */
13 /***********************************************************************/
14 
15 /***********************************************************************/
16 /*  Include relevant sections of the operating system header file.     */
17 /***********************************************************************/
18 #include "my_global.h"
19 #include "table.h"       // MySQL table definitions
20 #if defined(_WIN32)
21 #if defined(__BORLANDC__)
22 #define __MFC_COMPAT__                   // To define min/max as macro
23 #endif
24 //#include <windows.h>
25 #elif defined(UNIX)
26 #include <errno.h>
27 #include <unistd.h>
28 #include "osutil.h"
29 #else
30 #include <io.h>
31 #endif
32 
33 /***********************************************************************/
34 /*  Include application header files:                                  */
35 /*  global.h    is header containing all global declarations.          */
36 /*  plgdbsem.h  is header containing the DB application declarations.  */
37 /***********************************************************************/
38 #define FRM_VER 6
39 #include "sql_const.h"
40 #include "field.h"
41 #include "global.h"
42 #include "plgdbsem.h"
43 #include "xtable.h"
44 #include "tabext.h"
45 #include "tabcol.h"
46 #include "colblk.h"
47 #include "tabmysql.h"
48 #include "csort.h"
49 #include "tabutil.h"
50 #include "tabpivot.h"
51 #include "valblk.h"
52 #include "ha_connect.h"
53 
54 /***********************************************************************/
55 /*  Make the Pivot table column list.                                  */
56 /***********************************************************************/
PivotColumns(PGLOBAL g,const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)57 PQRYRES PivotColumns(PGLOBAL g, const char *tab,   const char *src,
58                                 const char *picol, const char *fncol,
59                                 const char *skcol, const char *host,
60                                 const char *db,    const char *user,
61                                 const char *pwd,   int port)
62   {
63   PIVAID pvd(tab, src, picol, fncol, skcol, host, db, user, pwd, port);
64 
65   return pvd.MakePivotColumns(g);
66   } // end of PivotColumns
67 
68 /* --------------- Implementation of the PIVAID classe --------------- */
69 
70 /***********************************************************************/
71 /*  PIVAID constructor.                                                */
72 /***********************************************************************/
PIVAID(const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)73 PIVAID::PIVAID(const char *tab,   const char *src,   const char *picol,
74                const char *fncol, const char *skcol, const char *host,
75                const char *db,    const char *user,  const char *pwd,
76                int port) : CSORT(false)
77   {
78   Host = (char*)host;
79   User = (char*)user;
80   Pwd = (char*)pwd;
81   Qryp = NULL;
82   Database = (char*)db;
83   Tabname = (char*)tab;
84   Tabsrc = (char*)src;
85   Picol = (char*)picol;
86   Fncol = (char*)fncol;
87   Skcol = (char*)skcol;
88   Rblkp = NULL;
89   Port = (port) ? port : GetDefaultPort();
90   } // end of PIVAID constructor
91 
92 /***********************************************************************/
93 /*  Skip columns that are in the skipped column list.                  */
94 /***********************************************************************/
SkipColumn(PCOLRES crp,char * skc)95 bool PIVAID::SkipColumn(PCOLRES crp, char *skc)
96   {
97   if (skc)
98     for (char *p = skc; *p; p += (strlen(p) + 1))
99       if (!stricmp(crp->Name, p))
100         return true;
101 
102   return false;
103   } // end of SkipColumn
104 
105 /***********************************************************************/
106 /*  Make the Pivot table column list.                                  */
107 /***********************************************************************/
MakePivotColumns(PGLOBAL g)108 PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
109 {
110 	char    *p, *query, *colname, *skc, buf[64];
111 	int      ndif, nblin, w = 0;
112 	bool     b = false;
113 	PVAL     valp;
114 	PQRYRES  qrp;
115 	PCOLRES *pcrp, crp, fncrp = NULL;
116 
117 	try {
118 		// Are there columns to skip?
119 		if (Skcol) {
120 			uint n = strlen(Skcol);
121 
122 			skc = (char*)PlugSubAlloc(g, NULL, n + 2);
123 			strcpy(skc, Skcol);
124 			skc[n + 1] = 0;
125 
126 			// Replace ; by nulls in skc
127 			for (p = strchr(skc, ';'); p; p = strchr(p, ';'))
128 				*p++ = 0;
129 
130 		} else
131 			skc = NULL;
132 
133 		if (!Tabsrc && Tabname) {
134 			// Locate the  query
135 			query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 26);
136 			sprintf(query, "SELECT * FROM `%s` LIMIT 1", Tabname);
137 		} else if (!Tabsrc) {
138 			strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
139 			goto err;
140 		} else
141 			query = (char*)Tabsrc;
142 
143 		// Open a MySQL connection for this table
144 		if (!Myc.Open(g, Host, Database, User, Pwd, Port)) {
145 			b = true;
146 
147 			// Returned values must be in their original character set
148 			if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
149 				goto err;
150 			else
151 				Myc.FreeResult();
152 
153 		} else
154 			goto err;
155 
156 		// Send the source command to MySQL
157 		if (Myc.ExecSQL(g, query, &w) == RC_FX)
158 			goto err;
159 
160 		// We must have a storage query to get pivot column values
161 		if (!(Qryp = Myc.GetResult(g, true)))
162 			goto err;
163 
164 		if (!Fncol) {
165 			for (crp = Qryp->Colresp; crp; crp = crp->Next)
166 				if ((!Picol || stricmp(Picol, crp->Name)) && !SkipColumn(crp, skc))
167 					Fncol = crp->Name;
168 
169 			if (!Fncol) {
170 				strcpy(g->Message, MSG(NO_DEF_FNCCOL));
171 				goto err;
172 			} // endif Fncol
173 
174 		} // endif Fncol
175 
176 		if (!Picol) {
177 			// Find default Picol as the last one not equal to Fncol
178 			for (crp = Qryp->Colresp; crp; crp = crp->Next)
179 				if (stricmp(Fncol, crp->Name) && !SkipColumn(crp, skc))
180 					Picol = crp->Name;
181 
182 			if (!Picol) {
183 				strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
184 				goto err;
185 			} // endif Picol
186 
187 		} // endif picol
188 
189 	  // Prepare the column list
190 		for (pcrp = &Qryp->Colresp; (crp = *pcrp); ) {
191 			if (SkipColumn(crp, skc)) {
192 				// Ignore this column
193 				*pcrp = crp->Next;
194 			} else if (!stricmp(Picol, crp->Name)) {
195 				if (crp->Nulls) {
196 					sprintf(g->Message, "Pivot column %s cannot be nullable", Picol);
197 					goto err;
198 				} // endif Nulls
199 
200 				Rblkp = crp->Kdata;
201 				*pcrp = crp->Next;
202 			} else if (!stricmp(Fncol, crp->Name)) {
203 				fncrp = crp;
204 				*pcrp = crp->Next;
205 			} else
206 				pcrp = &crp->Next;
207 		}
208 		if (!Rblkp) {
209 			strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
210 			goto err;
211 		} else if (!fncrp) {
212 			strcpy(g->Message, MSG(NO_DEF_FNCCOL));
213 			goto err;
214 		} // endif
215 
216 		if (Tabsrc) {
217 			Myc.Close();
218 			b = false;
219 
220 			// Before calling sort, initialize all
221 			nblin = Qryp->Nblin;
222 
223 			Index.Size = nblin * sizeof(int);
224 			Index.Sub = TRUE;                  // Should be small enough
225 
226 			if (!PlgDBalloc(g, NULL, Index))
227 				goto err;
228 
229 			Offset.Size = (nblin + 1) * sizeof(int);
230 			Offset.Sub = TRUE;                 // Should be small enough
231 
232 			if (!PlgDBalloc(g, NULL, Offset))
233 				goto err;
234 
235 			ndif = Qsort(g, nblin);
236 
237 			if (ndif < 0)           // error
238 				goto err;
239 
240 		} else {
241 			// The query was limited, we must get pivot column values
242 			// Returned values must be in their original character set
243 	    //  if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
244 	    //    goto err;
245 
246 			query = (char*)PlugSubAlloc(g, NULL, 0);
247 			sprintf(query, "SELECT DISTINCT `%s` FROM `%s`", Picol, Tabname);
248 			PlugSubAlloc(g, NULL, strlen(query) + 1);
249 			Myc.FreeResult();
250 
251 			// Send the source command to MySQL
252 			if (Myc.ExecSQL(g, query, &w) == RC_FX)
253 				goto err;
254 
255 			// We must have a storage query to get pivot column values
256 			if (!(qrp = Myc.GetResult(g, true)))
257 				goto err;
258 
259 			Myc.Close();
260 			b = false;
261 
262 			// Get the column list
263 			crp = qrp->Colresp;
264 			Rblkp = crp->Kdata;
265 			ndif = qrp->Nblin;
266 		} // endif Tabsrc
267 
268 		// Allocate the Value used to retieve column names
269 		if (!(valp = AllocateValue(g, Rblkp->GetType(),
270 				                          Rblkp->GetVlen(),
271 				                          Rblkp->GetPrec())))
272 			goto err;
273 
274 		// Now make the functional columns
275 		for (int i = 0; i < ndif; i++) {
276 			if (i) {
277 				crp = (PCOLRES)PlugSubAlloc(g, NULL, sizeof(COLRES));
278 				memcpy(crp, fncrp, sizeof(COLRES));
279 			} else
280 				crp = fncrp;
281 
282 			// Get the value that will be the generated column name
283 			if (Tabsrc)
284 				valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
285 			else
286 				valp->SetValue_pvblk(Rblkp, i);
287 
288 			colname = valp->GetCharString(buf);
289 			crp->Name = PlugDup(g, colname);
290 			crp->Flag = 1;
291 
292 			// Add this column
293 			*pcrp = crp;
294 			crp->Next = NULL;
295 			pcrp = &crp->Next;
296 		} // endfor i
297 
298 		// We added ndif columns and removed 2 (picol and fncol)
299 		Qryp->Nbcol += (ndif - 2);
300 		return Qryp;
301 	} catch (int n) {
302 		if (trace(1))
303 			htrc("Exception %d: %s\n", n, g->Message);
304 	} catch (const char *msg) {
305 		strcpy(g->Message, msg);
306 	} // end catch
307 
308 err:
309 	if (b)
310 		Myc.Close();
311 
312 	return NULL;
313 } // end of MakePivotColumns
314 
315 /***********************************************************************/
316 /*  PIVAID: Compare routine for sorting pivot column values.           */
317 /***********************************************************************/
Qcompare(int * i1,int * i2)318 int PIVAID::Qcompare(int *i1, int *i2)
319   {
320   // TODO: the actual comparison between pivot column result values.
321   return Rblkp->CompVal(*i1, *i2);
322   } // end of Qcompare
323 
324 /* --------------- Implementation of the PIVOT classes --------------- */
325 
326 /***********************************************************************/
327 /*  PIVOTDEF constructor.                                              */
328 /***********************************************************************/
PIVOTDEF(void)329   PIVOTDEF::PIVOTDEF(void)
330   {
331   Host = User = Pwd = DB = NULL;
332   Tabname = Tabsrc = Picol = Fncol = Function = NULL;
333   GBdone = Accept = false;
334   Port = 0;
335   } // end of PIVOTDEF constructor
336 
337 /***********************************************************************/
338 /*  DefineAM: define specific AM block values from PIVOT table.        */
339 /***********************************************************************/
DefineAM(PGLOBAL g,LPCSTR am,int poff)340 bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
341   {
342   char *p1, *p2;
343 
344   if (PRXDEF::DefineAM(g, am, poff))
345     return TRUE;
346 
347   Tabname = (char*)Tablep->GetName();
348   DB = (char*)Tablep->GetSchema();
349   Tabsrc = (char*)Tablep->GetSrc();
350 
351   Host = GetStringCatInfo(g, "Host", "localhost");
352   User = GetStringCatInfo(g, "User", "*");
353   Pwd = GetStringCatInfo(g, "Password", NULL);
354   Picol = GetStringCatInfo(g, "PivotCol", NULL);
355   Fncol = GetStringCatInfo(g, "FncCol", NULL);
356 
357   // If fncol is like avg(colname), separate Fncol and Function
358   if (Fncol && (p1 = strchr(Fncol, '(')) && (p2 = strchr(p1, ')')) &&
359       (*Fncol != '"') &&  (!*(p2+1))) {
360     *p1++ = '\0'; *p2 = '\0';
361     Function = Fncol;
362     Fncol = p1;
363   } else
364     Function = GetStringCatInfo(g, "Function", "SUM");
365 
366   GBdone = GetBoolCatInfo("Groupby", false);
367   Accept = GetBoolCatInfo("Accept", false);
368   Port = GetIntCatInfo("Port", 3306);
369   Desc = (Tabsrc) ? Tabsrc : Tabname;
370   return FALSE;
371   } // end of DefineAM
372 
373 /***********************************************************************/
374 /*  GetTable: makes a new TDB of the proper type.                      */
375 /***********************************************************************/
GetTable(PGLOBAL g,MODE)376 PTDB PIVOTDEF::GetTable(PGLOBAL g, MODE)
377   {
378   return new(g) TDBPIVOT(this);
379   } // end of GetTable
380 
381 /* ------------------------------------------------------------------- */
382 
383 /***********************************************************************/
384 /*  Implementation of the TDBPIVOT class.                              */
385 /***********************************************************************/
TDBPIVOT(PPIVOTDEF tdp)386 TDBPIVOT::TDBPIVOT(PPIVOTDEF tdp) : TDBPRX(tdp)
387   {
388   Host = tdp->Host;
389   Database = tdp->DB;
390   User = tdp->User;
391   Pwd = tdp->Pwd;
392   Port = tdp->Port;
393   Tabname = tdp->Tabname;    // Name of source table
394   Tabsrc = tdp->Tabsrc;      // SQL description of source table
395   Picol = tdp->Picol;        // Pivot column name
396   Fncol = tdp->Fncol;        // Function column name
397   Function = tdp->Function;  // Aggregate function name
398   Xcolp = NULL;              // To the FNCCOL column
399 //Xresp = NULL;              // To the pivot result column
400 //Rblkp = NULL;              // The value block of the pivot column
401   Fcolp = NULL;              // To the function column
402   Dcolp = NULL;              // To the dump column
403   GBdone = tdp->GBdone;
404   Accept = tdp->Accept;
405   Mult = -1;                // Estimated table size
406   N = 0;                    // The current table index
407   M = 0;                    // The occurrence rank
408   FileStatus = 0;           // Logical End-of-File
409   RowFlag = 0;              // 0: Ok, 1: Same, 2: Skip
410   } // end of TDBPIVOT constructor
411 
412 /***********************************************************************/
413 /*  Allocate source column description block.                          */
414 /***********************************************************************/
MakeCol(PGLOBAL g,PCOLDEF cdp,PCOL cprec,int n)415 PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
416   {
417   PCOL colp;
418 
419   if (cdp->GetOffset()) {
420     colp = new(g) FNCCOL(cdp, this, cprec, n);
421 
422     if (cdp->GetOffset() > 1)
423       Dcolp = colp;
424 
425   } else
426     colp = new(g) SRCCOL(cdp, this, cprec, n);
427 
428   return colp;
429   } // end of MakeCol
430 
431 /***********************************************************************/
432 /*  Find default fonction and pivot columns.                           */
433 /***********************************************************************/
FindDefaultColumns(PGLOBAL g)434 bool TDBPIVOT::FindDefaultColumns(PGLOBAL g)
435   {
436   PCOLDEF cdp;
437   PTABDEF defp = Tdbp->GetDef();
438 
439   if (!Fncol) {
440     for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
441       if (!Picol || stricmp(Picol, cdp->GetName()))
442         Fncol = cdp->GetName();
443 
444     if (!Fncol) {
445       strcpy(g->Message, MSG(NO_DEF_FNCCOL));
446       return true;
447       } // endif Fncol
448 
449     } // endif Fncol
450 
451   if (!Picol) {
452     // Find default Picol as the last one not equal to Fncol
453     for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
454       if (stricmp(Fncol, cdp->GetName()))
455         Picol = cdp->GetName();
456 
457     if (!Picol) {
458       strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
459       return true;
460       } // endif Picol
461 
462     } // endif Picol
463 
464   return false;
465   } // end of FindDefaultColumns
466 
467 /***********************************************************************/
468 /*  Prepare the source table Query.                                    */
469 /***********************************************************************/
GetSourceTable(PGLOBAL g)470 bool TDBPIVOT::GetSourceTable(PGLOBAL g)
471   {
472   if (Tdbp)
473     return false;             // Already done
474 
475   if (!Tabsrc && Tabname) {
476     // Get the table description block of this table
477     if (!(Tdbp = GetSubTable(g, ((PPIVOTDEF)To_Def)->Tablep, true)))
478       return true;
479 
480     if (!GBdone) {
481       char   *colist;
482       PCOLDEF cdp;
483 
484       if (FindDefaultColumns(g))
485         return true;
486 
487       // Locate the suballocated colist (size is not known yet)
488       *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0;
489 
490       // Make the column list
491       for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext())
492         if (!cdp->GetOffset())
493           strcat(strcat(colist, cdp->GetName()), ", ");
494 
495       // Add the Pivot column at the end of the list
496       strcat(colist, Picol);
497 
498       // Now we know how much was suballocated
499       PlugSubAlloc(g, NULL, strlen(colist) + 1);
500 
501       // Locate the source string (size is not known yet)
502       Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
503 
504       // Start making the definition
505       strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", ");
506 
507       // Make it suitable for Pivot by doing the group by
508       strcat(strcat(Tabsrc, Function), "(");
509       strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol);
510       strcat(strcat(Tabsrc, " FROM "), Tabname);
511       strcat(strcat(Tabsrc, " GROUP BY "), colist);
512 
513       if (Tdbp->IsView())     // Until MariaDB bug is fixed
514         strcat(strcat(Tabsrc, " ORDER BY "), colist);
515 
516       // Now we know how much was suballocated
517       PlugSubAlloc(g, NULL, strlen(Tabsrc) + 1);
518       } // endif !GBdone
519 
520   } else if (!Tabsrc) {
521     strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
522     return true;
523   } // endif
524 
525   if (Tabsrc) {
526     // Get the new table description block of this source table
527     PTABLE tablep = new(g) XTAB("whatever", Tabsrc);
528 
529     tablep->SetSchema(Database);
530 
531     if (!(Tdbp = GetSubTable(g, tablep, true)))
532       return true;
533 
534     } // endif Tabsrc
535 
536   return false;
537   } // end of GetSourceTable
538 
539 /***********************************************************************/
540 /*  Make the required pivot columns.                                   */
541 /***********************************************************************/
MakePivotColumns(PGLOBAL g)542 bool TDBPIVOT::MakePivotColumns(PGLOBAL g)
543   {
544   if (!Tdbp->IsView()) {
545     // This was not done yet if GBdone is true
546     if (FindDefaultColumns(g))
547       return true;
548 
549     // Now it is time to allocate the pivot and function columns
550     if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) {
551       // Function column not found in table
552       sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
553       return true;
554     } else if (Fcolp->InitValue(g))
555       return true;
556 
557     if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) {
558       // Pivot column not found in table
559       sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
560       return true;
561     } else if (Xcolp->InitValue(g))
562       return true;
563 
564     //  Check and initialize the subtable columns
565     for (PCOL cp = Columns; cp; cp = cp->GetNext())
566       if (cp->GetAmType() == TYPE_AM_SRC) {
567         if (((PSRCCOL)cp)->Init(g, NULL))
568           return TRUE;
569 
570       } else if (cp->GetAmType() == TYPE_AM_FNC)
571         if (((PFNCCOL)cp)->InitColumn(g))
572           return TRUE;
573 
574     } // endif isview
575 
576   return false;
577   } // end of MakePivotColumns
578 
579 /***********************************************************************/
580 /*  Make the required pivot columns for an object view.                */
581 /***********************************************************************/
MakeViewColumns(PGLOBAL g)582 bool TDBPIVOT::MakeViewColumns(PGLOBAL g)
583   {
584   if (Tdbp->IsView()) {
585     // Tdbp is a view ColDB cannot be used
586     PCOL   colp, cp;
587     PTDBMY tdbp;
588 
589     if (Tdbp->GetAmType() != TYPE_AM_MYSQL) {
590       strcpy(g->Message, "View is not MySQL");
591       return true;
592     } else
593       tdbp = (PTDBMY)Tdbp;
594 
595     if (!Fncol && !(Fncol = tdbp->FindFieldColumn(Picol))) {
596       strcpy(g->Message, MSG(NO_DEF_FNCCOL));
597       return true;
598       } // endif Fncol
599 
600     if (!Picol && !(Picol = tdbp->FindFieldColumn(Fncol))) {
601       strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
602       return true;
603       } // endif Picol
604 
605     // Now it is time to allocate the pivot and function columns
606     if (!(Fcolp = tdbp->MakeFieldColumn(g, Fncol)))
607     	return true;
608 
609     if (!(Xcolp = tdbp->MakeFieldColumn(g, Picol)))
610     	return true;
611 
612     //  Check and initialize the subtable columns
613     for (cp = Columns; cp; cp = cp->GetNext())
614       if (cp->GetAmType() == TYPE_AM_SRC) {
615         if ((colp = tdbp->MakeFieldColumn(g, cp->GetName()))) {
616           ((PSRCCOL)cp)->Colp = colp;
617           ((PSRCCOL)cp)->To_Val = colp->GetValue();
618           cp->AddStatus(BUF_READ);     // All is done here
619         } else
620     			return true;
621 
622       } else if (cp->GetAmType() == TYPE_AM_FNC)
623         if (((PFNCCOL)cp)->InitColumn(g))
624           return TRUE;
625 
626     } // endif isview
627 
628   return false;
629   } // end of MakeViewColumns
630 
631 /***********************************************************************/
632 /*  PIVOT GetMaxSize: returns the maximum number of rows in the table. */
633 /***********************************************************************/
GetMaxSize(PGLOBAL g)634 int TDBPIVOT::GetMaxSize(PGLOBAL g __attribute__((unused)))
635   {
636 #if  0
637   if (MaxSize < 0)
638     MaxSize = MakePivotColumns(g);
639 
640   return MaxSize;
641 #endif // 0
642   return 10;
643   } // end of GetMaxSize
644 
645 /***********************************************************************/
646 /*  In this sample, ROWID will be the (virtual) row number,            */
647 /*  while ROWNUM will be the occurrence rank in the multiple column.    */
648 /***********************************************************************/
RowNumber(PGLOBAL,bool b)649 int TDBPIVOT::RowNumber(PGLOBAL, bool b)
650   {
651   return (b) ? M : N;
652   } // end of RowNumber
653 
654 /***********************************************************************/
655 /*  PIVOT Access Method opening routine.                               */
656 /***********************************************************************/
OpenDB(PGLOBAL g)657 bool TDBPIVOT::OpenDB(PGLOBAL g)
658   {
659   if (Use == USE_OPEN) {
660     /*******************************************************************/
661     /*  Table already open, just replace it at its beginning.          */
662     /*******************************************************************/
663     N = M = 0;
664     RowFlag = 0;
665     FileStatus = 0;
666     return FALSE;
667     } // endif use
668 
669   if (Mode != MODE_READ) {
670     /*******************************************************************/
671     /* Currently PIVOT tables cannot be modified.                      */
672     /*******************************************************************/
673     sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
674     return TRUE;
675     } // endif Mode
676 
677   if (To_Key_Col || To_Kindex) {
678     /*******************************************************************/
679     /* Direct access of PIVOT tables is not implemented yet.           */
680     /*******************************************************************/
681     strcpy(g->Message, MSG(NO_PIV_DIR_ACC));
682     return TRUE;
683     } // endif To_Key_Col
684 
685   /*********************************************************************/
686   /*  Do it here if not done yet (should not be the case).             */
687   /*********************************************************************/
688   if (GetSourceTable(g))
689     return TRUE;
690 
691   // For tables, columns must be allocated before opening
692   if (MakePivotColumns(g))
693     return TRUE;
694 
695   /*********************************************************************/
696   /*  Physically open the object table.                                */
697   /*********************************************************************/
698 	if (Tdbp->OpenDB(g))
699 		return TRUE;
700 
701   Use = USE_OPEN;       // Do it now in case we are recursively called
702 
703   /*********************************************************************/
704   /*  Make all required pivot columns for object views.                */
705   /*********************************************************************/
706   return MakeViewColumns(g);
707   } // end of OpenDB
708 
709 /***********************************************************************/
710 /*  Data Base read routine for PIVOT access method.                    */
711 /***********************************************************************/
ReadDB(PGLOBAL g)712 int TDBPIVOT::ReadDB(PGLOBAL g)
713   {
714   int  rc = RC_OK;
715   bool newrow = FALSE;
716   PCOL colp;
717 
718   if (FileStatus == 2)
719     return RC_EF;
720 
721   if (FileStatus)
722     for (colp = Columns; colp; colp = colp->GetNext())
723       if (colp->GetAmType() == TYPE_AM_SRC)
724         ((PSRCCOL)colp)->SetColumn();
725 
726   // New row, reset all function column values
727   for (colp = Columns; colp; colp = colp->GetNext())
728     if (colp->GetAmType() == TYPE_AM_FNC)
729       colp->GetValue()->Reset();
730 
731   /*********************************************************************/
732   /*  Now start the multi reading process.                             */
733   /*********************************************************************/
734   do {
735     if (RowFlag != 1) {
736       if ((rc = Tdbp->ReadDB(g)) != RC_OK) {
737         if (FileStatus && rc == RC_EF) {
738           // A prepared row remains to be sent
739           FileStatus = 2;
740           rc = RC_OK;
741           } // endif FileStatus
742 
743         break;
744         } // endif rc
745 
746       for (colp = Tdbp->GetColumns(); colp; colp = colp->GetNext())
747         colp->ReadColumn(g);
748 
749       for (colp = Columns; colp; colp = colp->GetNext())
750       {
751         if (colp->GetAmType() == TYPE_AM_SRC)
752         {
753           if (FileStatus) {
754             if (((PSRCCOL)colp)->CompareLast()) {
755               newrow = (RowFlag) ? TRUE : FALSE;
756               break;
757               } // endif CompareLast
758 
759           } else
760             ((PSRCCOL)colp)->SetColumn();
761         }
762       }
763       FileStatus = 1;
764       } // endif RowFlag
765 
766     if (newrow) {
767       RowFlag = 1;
768       break;
769     } else
770       RowFlag = 2;
771 
772     // Look for the column having this header
773     for (colp = Columns; colp; colp = colp->GetNext())
774       if (colp->GetAmType() == TYPE_AM_FNC) {
775         if (((PFNCCOL)colp)->CompareColumn())
776           break;
777 
778         } // endif AmType
779 
780     if (!colp && !(colp = Dcolp)) {
781       if (!Accept) {
782         strcpy(g->Message, MSG(NO_MATCH_COL));
783         return RC_FX;
784       } else
785         continue;
786 
787       } // endif colp
788 
789     // Set the value of the matching column from the fonction value
790     colp->GetValue()->SetValue_pval(Fcolp->GetValue());
791     } while (RowFlag == 2);
792 
793   N++;
794   return rc;
795   } // end of ReadDB
796 
797 /***********************************************************************/
798 /*  WriteDB: Data Base write routine for PIVOT access methods.         */
799 /***********************************************************************/
WriteDB(PGLOBAL g)800 int TDBPIVOT::WriteDB(PGLOBAL g)
801   {
802   sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
803   return RC_FX;
804   } // end of WriteDB
805 
806 /***********************************************************************/
807 /*  Data Base delete line routine for PIVOT access methods.            */
808 /***********************************************************************/
DeleteDB(PGLOBAL g,int)809 int TDBPIVOT::DeleteDB(PGLOBAL g, int)
810   {
811   sprintf(g->Message, MSG(NO_TABLE_DEL), "PIVOT");
812   return RC_FX;
813   } // end of DeleteDB
814 
815 /***********************************************************************/
816 /*  Data Base close routine for PIVOT access method.                   */
817 /***********************************************************************/
CloseDB(PGLOBAL g)818 void TDBPIVOT::CloseDB(PGLOBAL g)
819   {
820   if (Tdbp)
821     Tdbp->CloseDB(g);
822 
823   } // end of CloseDB
824 
825 // ------------------------ FNCCOL functions ----------------------------
826 
827 /***********************************************************************/
828 /*  FNCCOL public constructor.                                       */
829 /***********************************************************************/
FNCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int i)830 FNCCOL::FNCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i)
831       : COLBLK(cdp, tdbp, i)
832   {
833   if (cprec) {
834     Next = cprec->GetNext();
835     cprec->SetNext(this);
836   } else {
837     Next = tdbp->GetColumns();
838     tdbp->SetColumns(this);
839   } // endif cprec
840 
841   Value = NULL;     // We'll get a new one later
842   Hval = NULL;      // The unconverted header value
843   Xcolp = NULL;
844   } // end of FNCCOL constructor
845 
846 /***********************************************************************/
847 /*  FNCCOL initialization function.                                    */
848 /***********************************************************************/
InitColumn(PGLOBAL g)849 bool FNCCOL::InitColumn(PGLOBAL g)
850 {
851   // Must have its own value block
852   if (InitValue(g))
853     return TRUE;
854 
855   // Make a value from the column name
856   Hval = AllocateValue(g, Name, TYPE_STRING);
857   Hval->SetPrec(1);         // Case insensitive
858 
859   Xcolp = ((PTDBPIVOT)To_Tdb)->Xcolp;
860   AddStatus(BUF_READ);      // All is done here
861   return FALSE;
862 } // end of InitColumn
863 
864 /***********************************************************************/
865 /*  CompareColumn: Compare column value with source column value.      */
866 /***********************************************************************/
CompareColumn(void)867 bool FNCCOL::CompareColumn(void)
868   {
869   // Compare the unconverted values
870   return Hval->IsEqual(Xcolp->GetValue(), false);
871   } // end of CompareColumn
872 
873 // ------------------------ SRCCOL functions ----------------------------
874 
875 /***********************************************************************/
876 /*  SRCCOL public constructor.                                         */
877 /***********************************************************************/
SRCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int n)878 SRCCOL::SRCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int n)
879       : PRXCOL(cdp, tdbp, cprec, n)
880   {
881   } // end of SRCCOL constructor
882 
883 /***********************************************************************/
884 /*  Initialize the column as pointing to the source column.            */
885 /***********************************************************************/
Init(PGLOBAL g,PTDB tp)886 bool SRCCOL::Init(PGLOBAL g, PTDB tp)
887   {
888   if (PRXCOL::Init(g, tp))
889     return true;
890 
891   AddStatus(BUF_READ);     // All is done here
892   return false;
893   } // end of SRCCOL constructor
894 
895 /***********************************************************************/
896 /*  SetColumn: have the column value set from the source column.       */
897 /***********************************************************************/
SetColumn(void)898 void SRCCOL::SetColumn(void)
899   {
900   Value->SetValue_pval(To_Val);
901   } // end of SetColumn
902 
903 /***********************************************************************/
904 /*  SetColumn: Compare column value with source column value.          */
905 /***********************************************************************/
CompareLast(void)906 bool SRCCOL::CompareLast(void)
907   {
908   // Compare the unconverted values
909   return !Value->IsEqual(To_Val, true);
910   } // end of CompareColumn
911 
912 /* --------------------- End of TabPivot/TabQrs ---------------------- */
913