1 /************ TabPivot C++ Program Source Code File (.CPP) *************/
2 /* PROGRAM NAME: TABPIVOT                                              */
3 /* -------------                                                       */
4 /*  Version 1.7                                                        */
5 /*                                                                     */
6 /* COPYRIGHT:                                                          */
7 /* ----------                                                          */
8 /*  (C) Copyright to the author Olivier BERTRAND          2005-2017    */
9 /*                                                                     */
10 /* WHAT THIS PROGRAM DOES:                                             */
11 /* -----------------------                                             */
12 /*  This program are the PIVOT classes DB execution routines.          */
13 /***********************************************************************/
14 
15 /***********************************************************************/
16 /*  Include relevant sections of the operating system header file.     */
17 /***********************************************************************/
18 #include "my_global.h"
19 #include "table.h"       // MySQL table definitions
20 #if defined(_WIN32)
21 #if defined(__BORLANDC__)
22 #define __MFC_COMPAT__                   // To define min/max as macro
23 #endif
24 //#include <windows.h>
25 #elif defined(UNIX)
26 #include <errno.h>
27 #include <unistd.h>
28 #include "osutil.h"
29 #else
30 #include <io.h>
31 #endif
32 
33 /***********************************************************************/
34 /*  Include application header files:                                  */
35 /*  global.h    is header containing all global declarations.          */
36 /*  plgdbsem.h  is header containing the DB application declarations.  */
37 /***********************************************************************/
38 #define FRM_VER 6
39 #include "sql_const.h"
40 #include "field.h"
41 #include "global.h"
42 #include "plgdbsem.h"
43 #include "xtable.h"
44 #include "tabext.h"
45 #include "tabcol.h"
46 #include "colblk.h"
47 #include "tabmysql.h"
48 #include "csort.h"
49 #include "tabutil.h"
50 #include "tabpivot.h"
51 #include "valblk.h"
52 #include "ha_connect.h"
53 
54 /***********************************************************************/
55 /*  Make the Pivot table column list.                                  */
56 /***********************************************************************/
PivotColumns(PGLOBAL g,const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)57 PQRYRES PivotColumns(PGLOBAL g, const char *tab,   const char *src,
58                                 const char *picol, const char *fncol,
59                                 const char *skcol, const char *host,
60                                 const char *db,    const char *user,
61                                 const char *pwd,   int port)
62   {
63   PIVAID pvd(tab, src, picol, fncol, skcol, host, db, user, pwd, port);
64 
65   return pvd.MakePivotColumns(g);
66   } // end of PivotColumns
67 
68 /* --------------- Implementation of the PIVAID classe --------------- */
69 
70 /***********************************************************************/
71 /*  PIVAID constructor.                                                */
72 /***********************************************************************/
PIVAID(const char * tab,const char * src,const char * picol,const char * fncol,const char * skcol,const char * host,const char * db,const char * user,const char * pwd,int port)73 PIVAID::PIVAID(const char *tab,   const char *src,   const char *picol,
74                const char *fncol, const char *skcol, const char *host,
75                const char *db,    const char *user,  const char *pwd,
76                int port) : CSORT(false)
77   {
78   Host = (char*)host;
79   User = (char*)user;
80   Pwd = (char*)pwd;
81   Qryp = NULL;
82   Database = (char*)db;
83   Tabname = (char*)tab;
84   Tabsrc = (char*)src;
85   Picol = (char*)picol;
86   Fncol = (char*)fncol;
87   Skcol = (char*)skcol;
88   Rblkp = NULL;
89   Port = (port) ? port : GetDefaultPort();
90   } // end of PIVAID constructor
91 
92 /***********************************************************************/
93 /*  Skip columns that are in the skipped column list.                  */
94 /***********************************************************************/
SkipColumn(PCOLRES crp,char * skc)95 bool PIVAID::SkipColumn(PCOLRES crp, char *skc)
96   {
97   if (skc)
98     for (char *p = skc; *p; p += (strlen(p) + 1))
99       if (!stricmp(crp->Name, p))
100         return true;
101 
102   return false;
103   } // end of SkipColumn
104 
105 /***********************************************************************/
106 /*  Make the Pivot table column list.                                  */
107 /***********************************************************************/
MakePivotColumns(PGLOBAL g)108 PQRYRES PIVAID::MakePivotColumns(PGLOBAL g)
109 {
110 	char    *p, *query, *colname, *skc, buf[64];
111 	int      ndif, nblin, w = 0;
112 	bool     b = false;
113 	PVAL     valp;
114 	PQRYRES  qrp;
115 	PCOLRES *pcrp, crp, fncrp = NULL;
116 
117 	try {
118 		// Are there columns to skip?
119 		if (Skcol) {
120 			uint n = strlen(Skcol);
121 
122 			skc = (char*)PlugSubAlloc(g, NULL, n + 2);
123 			strcpy(skc, Skcol);
124 			skc[n + 1] = 0;
125 
126 			// Replace ; by nulls in skc
127 			for (p = strchr(skc, ';'); p; p = strchr(p, ';'))
128 				*p++ = 0;
129 
130 		} else
131 			skc = NULL;
132 
133 		if (!Tabsrc && Tabname) {
134 			// Locate the  query
135 			query = (char*)PlugSubAlloc(g, NULL, strlen(Tabname) + 26);
136 			sprintf(query, "SELECT * FROM `%s` LIMIT 1", Tabname);
137 		} else if (!Tabsrc) {
138 			strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
139 			goto err;
140 		} else
141 			query = (char*)Tabsrc;
142 
143 		// Open a MySQL connection for this table
144 		if (!Myc.Open(g, Host, Database, User, Pwd, Port)) {
145 			b = true;
146 
147 			// Returned values must be in their original character set
148 			if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
149 				goto err;
150 			else
151 				Myc.FreeResult();
152 
153 		} else
154 			goto err;
155 
156 		// Send the source command to MySQL
157 		if (Myc.ExecSQL(g, query, &w) == RC_FX)
158 			goto err;
159 
160 		// We must have a storage query to get pivot column values
161 		if (!(Qryp = Myc.GetResult(g, true)))
162 			goto err;
163 
164 		if (!Fncol) {
165 			for (crp = Qryp->Colresp; crp; crp = crp->Next)
166 				if ((!Picol || stricmp(Picol, crp->Name)) && !SkipColumn(crp, skc))
167 					Fncol = crp->Name;
168 
169 			if (!Fncol) {
170 				strcpy(g->Message, MSG(NO_DEF_FNCCOL));
171 				goto err;
172 			} // endif Fncol
173 
174 		} // endif Fncol
175 
176 		if (!Picol) {
177 			// Find default Picol as the last one not equal to Fncol
178 			for (crp = Qryp->Colresp; crp; crp = crp->Next)
179 				if (stricmp(Fncol, crp->Name) && !SkipColumn(crp, skc))
180 					Picol = crp->Name;
181 
182 			if (!Picol) {
183 				strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
184 				goto err;
185 			} // endif Picol
186 
187 		} // endif picol
188 
189 	  // Prepare the column list
190 		for (pcrp = &Qryp->Colresp; (crp = *pcrp); )
191 			if (SkipColumn(crp, skc)) {
192 				// Ignore this column
193 				*pcrp = crp->Next;
194 			} else if (!stricmp(Picol, crp->Name)) {
195 				if (crp->Nulls) {
196 					sprintf(g->Message, "Pivot column %s cannot be nullable", Picol);
197 					goto err;
198 				} // endif Nulls
199 
200 				Rblkp = crp->Kdata;
201 				*pcrp = crp->Next;
202 			} else if (!stricmp(Fncol, crp->Name)) {
203 				fncrp = crp;
204 				*pcrp = crp->Next;
205 			} else
206 				pcrp = &crp->Next;
207 
208 		if (!Rblkp) {
209 			strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
210 			goto err;
211 		} else if (!fncrp) {
212 			strcpy(g->Message, MSG(NO_DEF_FNCCOL));
213 			goto err;
214 		} // endif
215 
216 		if (Tabsrc) {
217 			Myc.Close();
218 			b = false;
219 
220 			// Before calling sort, initialize all
221 			nblin = Qryp->Nblin;
222 
223 			Index.Size = nblin * sizeof(int);
224 			Index.Sub = TRUE;                  // Should be small enough
225 
226 			if (!PlgDBalloc(g, NULL, Index))
227 				goto err;
228 
229 			Offset.Size = (nblin + 1) * sizeof(int);
230 			Offset.Sub = TRUE;                 // Should be small enough
231 
232 			if (!PlgDBalloc(g, NULL, Offset))
233 				goto err;
234 
235 			ndif = Qsort(g, nblin);
236 
237 			if (ndif < 0)           // error
238 				goto err;
239 
240 		} else {
241 			// The query was limited, we must get pivot column values
242 			// Returned values must be in their original character set
243 	    //  if (Myc.ExecSQL(g, "SET character_set_results=NULL", &w) == RC_FX)
244 	    //    goto err;
245 
246 			query = (char*)PlugSubAlloc(g, NULL, 0);
247 			sprintf(query, "SELECT DISTINCT `%s` FROM `%s`", Picol, Tabname);
248 			PlugSubAlloc(g, NULL, strlen(query) + 1);
249 			Myc.FreeResult();
250 
251 			// Send the source command to MySQL
252 			if (Myc.ExecSQL(g, query, &w) == RC_FX)
253 				goto err;
254 
255 			// We must have a storage query to get pivot column values
256 			if (!(qrp = Myc.GetResult(g, true)))
257 				goto err;
258 
259 			Myc.Close();
260 			b = false;
261 
262 			// Get the column list
263 			crp = qrp->Colresp;
264 			Rblkp = crp->Kdata;
265 			ndif = qrp->Nblin;
266 		} // endif Tabsrc
267 
268 		// Allocate the Value used to retieve column names
269 		if (!(valp = AllocateValue(g, Rblkp->GetType(),
270 				                          Rblkp->GetVlen(),
271 				                          Rblkp->GetPrec())))
272 			goto err;
273 
274 		// Now make the functional columns
275 		for (int i = 0; i < ndif; i++) {
276 			if (i) {
277 				crp = (PCOLRES)PlugSubAlloc(g, NULL, sizeof(COLRES));
278 				memcpy(crp, fncrp, sizeof(COLRES));
279 			} else
280 				crp = fncrp;
281 
282 			// Get the value that will be the generated column name
283 			if (Tabsrc)
284 				valp->SetValue_pvblk(Rblkp, Pex[Pof[i]]);
285 			else
286 				valp->SetValue_pvblk(Rblkp, i);
287 
288 			colname = valp->GetCharString(buf);
289 			crp->Name = PlugDup(g, colname);
290 			crp->Flag = 1;
291 
292 			// Add this column
293 			*pcrp = crp;
294 			crp->Next = NULL;
295 			pcrp = &crp->Next;
296 		} // endfor i
297 
298 		// We added ndif columns and removed 2 (picol and fncol)
299 		Qryp->Nbcol += (ndif - 2);
300 		return Qryp;
301 	} catch (int n) {
302 		if (trace(1))
303 			htrc("Exception %d: %s\n", n, g->Message);
304 	} catch (const char *msg) {
305 		strcpy(g->Message, msg);
306 	} // end catch
307 
308 err:
309 	if (b)
310 		Myc.Close();
311 
312 	return NULL;
313 } // end of MakePivotColumns
314 
315 /***********************************************************************/
316 /*  PIVAID: Compare routine for sorting pivot column values.           */
317 /***********************************************************************/
Qcompare(int * i1,int * i2)318 int PIVAID::Qcompare(int *i1, int *i2)
319   {
320   // TODO: the actual comparison between pivot column result values.
321   return Rblkp->CompVal(*i1, *i2);
322   } // end of Qcompare
323 
324 /* --------------- Implementation of the PIVOT classes --------------- */
325 
326 /***********************************************************************/
327 /*  PIVOTDEF constructor.                                              */
328 /***********************************************************************/
PIVOTDEF(void)329   PIVOTDEF::PIVOTDEF(void)
330   {
331   Host = User = Pwd = DB = NULL;
332   Tabname = Tabsrc = Picol = Fncol = Function = NULL;
333   GBdone = Accept = false;
334   Port = 0;
335   } // end of PIVOTDEF constructor
336 
337 /***********************************************************************/
338 /*  DefineAM: define specific AM block values from PIVOT table.        */
339 /***********************************************************************/
DefineAM(PGLOBAL g,LPCSTR am,int poff)340 bool PIVOTDEF::DefineAM(PGLOBAL g, LPCSTR am, int poff)
341   {
342   char *p1, *p2;
343   PHC    hc __attribute__((unused))= ((MYCAT*)Cat)->GetHandler();
344 
345   if (PRXDEF::DefineAM(g, am, poff))
346     return TRUE;
347 
348   Tabname = (char*)Tablep->GetName();
349   DB = (char*)Tablep->GetSchema();
350   Tabsrc = (char*)Tablep->GetSrc();
351 
352   Host = GetStringCatInfo(g, "Host", "localhost");
353   User = GetStringCatInfo(g, "User", "*");
354   Pwd = GetStringCatInfo(g, "Password", NULL);
355   Picol = GetStringCatInfo(g, "PivotCol", NULL);
356   Fncol = GetStringCatInfo(g, "FncCol", NULL);
357 
358   // If fncol is like avg(colname), separate Fncol and Function
359   if (Fncol && (p1 = strchr(Fncol, '(')) && (p2 = strchr(p1, ')')) &&
360       (*Fncol != '"') &&  (!*(p2+1))) {
361     *p1++ = '\0'; *p2 = '\0';
362     Function = Fncol;
363     Fncol = p1;
364   } else
365     Function = GetStringCatInfo(g, "Function", "SUM");
366 
367   GBdone = GetBoolCatInfo("Groupby", false);
368   Accept = GetBoolCatInfo("Accept", false);
369   Port = GetIntCatInfo("Port", 3306);
370   Desc = (Tabsrc) ? Tabsrc : Tabname;
371   return FALSE;
372   } // end of DefineAM
373 
374 /***********************************************************************/
375 /*  GetTable: makes a new TDB of the proper type.                      */
376 /***********************************************************************/
GetTable(PGLOBAL g,MODE)377 PTDB PIVOTDEF::GetTable(PGLOBAL g, MODE)
378   {
379   return new(g) TDBPIVOT(this);
380   } // end of GetTable
381 
382 /* ------------------------------------------------------------------- */
383 
384 /***********************************************************************/
385 /*  Implementation of the TDBPIVOT class.                              */
386 /***********************************************************************/
TDBPIVOT(PPIVOTDEF tdp)387 TDBPIVOT::TDBPIVOT(PPIVOTDEF tdp) : TDBPRX(tdp)
388   {
389   Host = tdp->Host;
390   Database = tdp->DB;
391   User = tdp->User;
392   Pwd = tdp->Pwd;
393   Port = tdp->Port;
394   Tabname = tdp->Tabname;    // Name of source table
395   Tabsrc = tdp->Tabsrc;      // SQL description of source table
396   Picol = tdp->Picol;        // Pivot column name
397   Fncol = tdp->Fncol;        // Function column name
398   Function = tdp->Function;  // Aggregate function name
399   Xcolp = NULL;              // To the FNCCOL column
400 //Xresp = NULL;              // To the pivot result column
401 //Rblkp = NULL;              // The value block of the pivot column
402   Fcolp = NULL;              // To the function column
403   Dcolp = NULL;              // To the dump column
404   GBdone = tdp->GBdone;
405   Accept = tdp->Accept;
406   Mult = -1;                // Estimated table size
407   N = 0;                    // The current table index
408   M = 0;                    // The occurence rank
409   FileStatus = 0;           // Logical End-of-File
410   RowFlag = 0;              // 0: Ok, 1: Same, 2: Skip
411   } // end of TDBPIVOT constructor
412 
413 /***********************************************************************/
414 /*  Allocate source column description block.                          */
415 /***********************************************************************/
MakeCol(PGLOBAL g,PCOLDEF cdp,PCOL cprec,int n)416 PCOL TDBPIVOT::MakeCol(PGLOBAL g, PCOLDEF cdp, PCOL cprec, int n)
417   {
418   PCOL colp;
419 
420   if (cdp->GetOffset()) {
421     colp = new(g) FNCCOL(cdp, this, cprec, n);
422 
423     if (cdp->GetOffset() > 1)
424       Dcolp = colp;
425 
426   } else
427     colp = new(g) SRCCOL(cdp, this, cprec, n);
428 
429   return colp;
430   } // end of MakeCol
431 
432 /***********************************************************************/
433 /*  Find default fonction and pivot columns.                           */
434 /***********************************************************************/
FindDefaultColumns(PGLOBAL g)435 bool TDBPIVOT::FindDefaultColumns(PGLOBAL g)
436   {
437   PCOLDEF cdp;
438   PTABDEF defp = Tdbp->GetDef();
439 
440   if (!Fncol) {
441     for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
442       if (!Picol || stricmp(Picol, cdp->GetName()))
443         Fncol = cdp->GetName();
444 
445     if (!Fncol) {
446       strcpy(g->Message, MSG(NO_DEF_FNCCOL));
447       return true;
448       } // endif Fncol
449 
450     } // endif Fncol
451 
452   if (!Picol) {
453     // Find default Picol as the last one not equal to Fncol
454     for (cdp = defp->GetCols(); cdp; cdp = cdp->GetNext())
455       if (stricmp(Fncol, cdp->GetName()))
456         Picol = cdp->GetName();
457 
458     if (!Picol) {
459       strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
460       return true;
461       } // endif Picol
462 
463     } // endif Picol
464 
465   return false;
466   } // end of FindDefaultColumns
467 
468 /***********************************************************************/
469 /*  Prepare the source table Query.                                    */
470 /***********************************************************************/
GetSourceTable(PGLOBAL g)471 bool TDBPIVOT::GetSourceTable(PGLOBAL g)
472   {
473   if (Tdbp)
474     return false;             // Already done
475 
476   if (!Tabsrc && Tabname) {
477     // Get the table description block of this table
478     if (!(Tdbp = GetSubTable(g, ((PPIVOTDEF)To_Def)->Tablep, true)))
479       return true;
480 
481     if (!GBdone) {
482       char   *colist;
483       PCOLDEF cdp;
484 
485       if (FindDefaultColumns(g))
486         return true;
487 
488       // Locate the suballocated colist (size is not known yet)
489       *(colist = (char*)PlugSubAlloc(g, NULL, 0)) = 0;
490 
491       // Make the column list
492       for (cdp = To_Def->GetCols(); cdp; cdp = cdp->GetNext())
493         if (!cdp->GetOffset())
494           strcat(strcat(colist, cdp->GetName()), ", ");
495 
496       // Add the Pivot column at the end of the list
497       strcat(colist, Picol);
498 
499       // Now we know how much was suballocated
500       PlugSubAlloc(g, NULL, strlen(colist) + 1);
501 
502       // Locate the source string (size is not known yet)
503       Tabsrc = (char*)PlugSubAlloc(g, NULL, 0);
504 
505       // Start making the definition
506       strcat(strcat(strcpy(Tabsrc, "SELECT "), colist), ", ");
507 
508       // Make it suitable for Pivot by doing the group by
509       strcat(strcat(Tabsrc, Function), "(");
510       strcat(strcat(strcat(Tabsrc, Fncol), ") "), Fncol);
511       strcat(strcat(Tabsrc, " FROM "), Tabname);
512       strcat(strcat(Tabsrc, " GROUP BY "), colist);
513 
514       if (Tdbp->IsView())     // Until MariaDB bug is fixed
515         strcat(strcat(Tabsrc, " ORDER BY "), colist);
516 
517       // Now we know how much was suballocated
518       PlugSubAlloc(g, NULL, strlen(Tabsrc) + 1);
519       } // endif !GBdone
520 
521   } else if (!Tabsrc) {
522     strcpy(g->Message, MSG(SRC_TABLE_UNDEF));
523     return true;
524   } // endif
525 
526   if (Tabsrc) {
527     // Get the new table description block of this source table
528     PTABLE tablep = new(g) XTAB("whatever", Tabsrc);
529 
530     tablep->SetSchema(Database);
531 
532     if (!(Tdbp = GetSubTable(g, tablep, true)))
533       return true;
534 
535     } // endif Tabsrc
536 
537   return false;
538   } // end of GetSourceTable
539 
540 /***********************************************************************/
541 /*  Make the required pivot columns.                                   */
542 /***********************************************************************/
MakePivotColumns(PGLOBAL g)543 bool TDBPIVOT::MakePivotColumns(PGLOBAL g)
544   {
545   if (!Tdbp->IsView()) {
546     // This was not done yet if GBdone is true
547     if (FindDefaultColumns(g))
548       return true;
549 
550     // Now it is time to allocate the pivot and function columns
551     if (!(Fcolp = Tdbp->ColDB(g, Fncol, 0))) {
552       // Function column not found in table
553       sprintf(g->Message, MSG(COL_ISNOT_TABLE), Fncol, Tabname);
554       return true;
555     } else if (Fcolp->InitValue(g))
556       return true;
557 
558     if (!(Xcolp = Tdbp->ColDB(g, Picol, 0))) {
559       // Pivot column not found in table
560       sprintf(g->Message, MSG(COL_ISNOT_TABLE), Picol, Tabname);
561       return true;
562     } else if (Xcolp->InitValue(g))
563       return true;
564 
565     //  Check and initialize the subtable columns
566     for (PCOL cp = Columns; cp; cp = cp->GetNext())
567       if (cp->GetAmType() == TYPE_AM_SRC) {
568         if (((PSRCCOL)cp)->Init(g, NULL))
569           return TRUE;
570 
571       } else if (cp->GetAmType() == TYPE_AM_FNC)
572         if (((PFNCCOL)cp)->InitColumn(g))
573           return TRUE;
574 
575     } // endif isview
576 
577   return false;
578   } // end of MakePivotColumns
579 
580 /***********************************************************************/
581 /*  Make the required pivot columns for an object view.                */
582 /***********************************************************************/
MakeViewColumns(PGLOBAL g)583 bool TDBPIVOT::MakeViewColumns(PGLOBAL g)
584   {
585   if (Tdbp->IsView()) {
586     // Tdbp is a view ColDB cannot be used
587     PCOL   colp, cp;
588     PTDBMY tdbp;
589 
590     if (Tdbp->GetAmType() != TYPE_AM_MYSQL) {
591       strcpy(g->Message, "View is not MySQL");
592       return true;
593     } else
594       tdbp = (PTDBMY)Tdbp;
595 
596     if (!Fncol && !(Fncol = tdbp->FindFieldColumn(Picol))) {
597       strcpy(g->Message, MSG(NO_DEF_FNCCOL));
598       return true;
599       } // endif Fncol
600 
601     if (!Picol && !(Picol = tdbp->FindFieldColumn(Fncol))) {
602       strcpy(g->Message, MSG(NO_DEF_PIVOTCOL));
603       return true;
604       } // endif Picol
605 
606     // Now it is time to allocate the pivot and function columns
607     if (!(Fcolp = tdbp->MakeFieldColumn(g, Fncol)))
608     	return true;
609 
610     if (!(Xcolp = tdbp->MakeFieldColumn(g, Picol)))
611     	return true;
612 
613     //  Check and initialize the subtable columns
614     for (cp = Columns; cp; cp = cp->GetNext())
615       if (cp->GetAmType() == TYPE_AM_SRC) {
616         if ((colp = tdbp->MakeFieldColumn(g, cp->GetName()))) {
617           ((PSRCCOL)cp)->Colp = colp;
618           ((PSRCCOL)cp)->To_Val = colp->GetValue();
619           cp->AddStatus(BUF_READ);     // All is done here
620         } else
621     			return true;
622 
623       } else if (cp->GetAmType() == TYPE_AM_FNC)
624         if (((PFNCCOL)cp)->InitColumn(g))
625           return TRUE;
626 
627     } // endif isview
628 
629   return false;
630   } // end of MakeViewColumns
631 
632 /***********************************************************************/
633 /*  PIVOT GetMaxSize: returns the maximum number of rows in the table. */
634 /***********************************************************************/
GetMaxSize(PGLOBAL g)635 int TDBPIVOT::GetMaxSize(PGLOBAL g __attribute__((unused)))
636   {
637 #if  0
638   if (MaxSize < 0)
639     MaxSize = MakePivotColumns(g);
640 
641   return MaxSize;
642 #endif // 0
643   return 10;
644   } // end of GetMaxSize
645 
646 /***********************************************************************/
647 /*  In this sample, ROWID will be the (virtual) row number,            */
648 /*  while ROWNUM will be the occurence rank in the multiple column.    */
649 /***********************************************************************/
RowNumber(PGLOBAL,bool b)650 int TDBPIVOT::RowNumber(PGLOBAL, bool b)
651   {
652   return (b) ? M : N;
653   } // end of RowNumber
654 
655 /***********************************************************************/
656 /*  PIVOT Access Method opening routine.                               */
657 /***********************************************************************/
OpenDB(PGLOBAL g)658 bool TDBPIVOT::OpenDB(PGLOBAL g)
659   {
660   if (Use == USE_OPEN) {
661     /*******************************************************************/
662     /*  Table already open, just replace it at its beginning.          */
663     /*******************************************************************/
664     N = M = 0;
665     RowFlag = 0;
666     FileStatus = 0;
667     return FALSE;
668     } // endif use
669 
670   if (Mode != MODE_READ) {
671     /*******************************************************************/
672     /* Currently PIVOT tables cannot be modified.                      */
673     /*******************************************************************/
674     sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
675     return TRUE;
676     } // endif Mode
677 
678   if (To_Key_Col || To_Kindex) {
679     /*******************************************************************/
680     /* Direct access of PIVOT tables is not implemented yet.           */
681     /*******************************************************************/
682     strcpy(g->Message, MSG(NO_PIV_DIR_ACC));
683     return TRUE;
684     } // endif To_Key_Col
685 
686   /*********************************************************************/
687   /*  Do it here if not done yet (should not be the case).             */
688   /*********************************************************************/
689   if (GetSourceTable(g))
690     return TRUE;
691 
692   // For tables, columns must be allocated before opening
693   if (MakePivotColumns(g))
694     return TRUE;
695 
696   /*********************************************************************/
697   /*  Physically open the object table.                                */
698   /*********************************************************************/
699 	if (Tdbp->OpenDB(g))
700 		return TRUE;
701 
702   Use = USE_OPEN;       // Do it now in case we are recursively called
703 
704   /*********************************************************************/
705   /*  Make all required pivot columns for object views.                */
706   /*********************************************************************/
707   return MakeViewColumns(g);
708   } // end of OpenDB
709 
710 /***********************************************************************/
711 /*  Data Base read routine for PIVOT access method.                    */
712 /***********************************************************************/
ReadDB(PGLOBAL g)713 int TDBPIVOT::ReadDB(PGLOBAL g)
714   {
715   int  rc = RC_OK;
716   bool newrow = FALSE;
717   PCOL colp;
718 
719   if (FileStatus == 2)
720     return RC_EF;
721 
722   if (FileStatus)
723     for (colp = Columns; colp; colp = colp->GetNext())
724       if (colp->GetAmType() == TYPE_AM_SRC)
725         ((PSRCCOL)colp)->SetColumn();
726 
727   // New row, reset all function column values
728   for (colp = Columns; colp; colp = colp->GetNext())
729     if (colp->GetAmType() == TYPE_AM_FNC)
730       colp->GetValue()->Reset();
731 
732   /*********************************************************************/
733   /*  Now start the multi reading process.                             */
734   /*********************************************************************/
735   do {
736     if (RowFlag != 1) {
737       if ((rc = Tdbp->ReadDB(g)) != RC_OK) {
738         if (FileStatus && rc == RC_EF) {
739           // A prepared row remains to be sent
740           FileStatus = 2;
741           rc = RC_OK;
742           } // endif FileStatus
743 
744         break;
745         } // endif rc
746 
747       for (colp = Tdbp->GetColumns(); colp; colp = colp->GetNext())
748         colp->ReadColumn(g);
749 
750       for (colp = Columns; colp; colp = colp->GetNext())
751       {
752         if (colp->GetAmType() == TYPE_AM_SRC)
753         {
754           if (FileStatus) {
755             if (((PSRCCOL)colp)->CompareLast()) {
756               newrow = (RowFlag) ? TRUE : FALSE;
757               break;
758               } // endif CompareLast
759 
760           } else
761             ((PSRCCOL)colp)->SetColumn();
762         }
763       }
764       FileStatus = 1;
765       } // endif RowFlag
766 
767     if (newrow) {
768       RowFlag = 1;
769       break;
770     } else
771       RowFlag = 2;
772 
773     // Look for the column having this header
774     for (colp = Columns; colp; colp = colp->GetNext())
775       if (colp->GetAmType() == TYPE_AM_FNC) {
776         if (((PFNCCOL)colp)->CompareColumn())
777           break;
778 
779         } // endif AmType
780 
781     if (!colp && !(colp = Dcolp)) {
782       if (!Accept) {
783         strcpy(g->Message, MSG(NO_MATCH_COL));
784         return RC_FX;
785       } else
786         continue;
787 
788       } // endif colp
789 
790     // Set the value of the matching column from the fonction value
791     colp->GetValue()->SetValue_pval(Fcolp->GetValue());
792     } while (RowFlag == 2);
793 
794   N++;
795   return rc;
796   } // end of ReadDB
797 
798 /***********************************************************************/
799 /*  WriteDB: Data Base write routine for PIVOT access methods.         */
800 /***********************************************************************/
WriteDB(PGLOBAL g)801 int TDBPIVOT::WriteDB(PGLOBAL g)
802   {
803   sprintf(g->Message, MSG(TABLE_READ_ONLY), "PIVOT");
804   return RC_FX;
805   } // end of WriteDB
806 
807 /***********************************************************************/
808 /*  Data Base delete line routine for PIVOT access methods.            */
809 /***********************************************************************/
DeleteDB(PGLOBAL g,int)810 int TDBPIVOT::DeleteDB(PGLOBAL g, int)
811   {
812   sprintf(g->Message, MSG(NO_TABLE_DEL), "PIVOT");
813   return RC_FX;
814   } // end of DeleteDB
815 
816 /***********************************************************************/
817 /*  Data Base close routine for PIVOT access method.                   */
818 /***********************************************************************/
CloseDB(PGLOBAL g)819 void TDBPIVOT::CloseDB(PGLOBAL g)
820   {
821   if (Tdbp)
822     Tdbp->CloseDB(g);
823 
824   } // end of CloseDB
825 
826 // ------------------------ FNCCOL functions ----------------------------
827 
828 /***********************************************************************/
829 /*  FNCCOL public constructor.                                       */
830 /***********************************************************************/
FNCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int i)831 FNCCOL::FNCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int i)
832       : COLBLK(cdp, tdbp, i)
833   {
834   if (cprec) {
835     Next = cprec->GetNext();
836     cprec->SetNext(this);
837   } else {
838     Next = tdbp->GetColumns();
839     tdbp->SetColumns(this);
840   } // endif cprec
841 
842   Value = NULL;     // We'll get a new one later
843   Hval = NULL;      // The unconverted header value
844   Xcolp = NULL;
845   } // end of FNCCOL constructor
846 
847 /***********************************************************************/
848 /*  FNCCOL initialization function.                                    */
849 /***********************************************************************/
InitColumn(PGLOBAL g)850 bool FNCCOL::InitColumn(PGLOBAL g)
851 {
852   // Must have its own value block
853   if (InitValue(g))
854     return TRUE;
855 
856   // Make a value from the column name
857   Hval = AllocateValue(g, Name, TYPE_STRING);
858   Hval->SetPrec(1);         // Case insensitive
859 
860   Xcolp = ((PTDBPIVOT)To_Tdb)->Xcolp;
861   AddStatus(BUF_READ);      // All is done here
862   return FALSE;
863 } // end of InitColumn
864 
865 /***********************************************************************/
866 /*  CompareColumn: Compare column value with source column value.      */
867 /***********************************************************************/
CompareColumn(void)868 bool FNCCOL::CompareColumn(void)
869   {
870   // Compare the unconverted values
871   return Hval->IsEqual(Xcolp->GetValue(), false);
872   } // end of CompareColumn
873 
874 // ------------------------ SRCCOL functions ----------------------------
875 
876 /***********************************************************************/
877 /*  SRCCOL public constructor.                                         */
878 /***********************************************************************/
SRCCOL(PCOLDEF cdp,PTDB tdbp,PCOL cprec,int n)879 SRCCOL::SRCCOL(PCOLDEF cdp, PTDB tdbp, PCOL cprec, int n)
880       : PRXCOL(cdp, tdbp, cprec, n)
881   {
882   } // end of SRCCOL constructor
883 
884 /***********************************************************************/
885 /*  Initialize the column as pointing to the source column.            */
886 /***********************************************************************/
Init(PGLOBAL g,PTDB tp)887 bool SRCCOL::Init(PGLOBAL g, PTDB tp)
888   {
889   if (PRXCOL::Init(g, tp))
890     return true;
891 
892   AddStatus(BUF_READ);     // All is done here
893   return false;
894   } // end of SRCCOL constructor
895 
896 /***********************************************************************/
897 /*  SetColumn: have the column value set from the source column.       */
898 /***********************************************************************/
SetColumn(void)899 void SRCCOL::SetColumn(void)
900   {
901   Value->SetValue_pval(To_Val);
902   } // end of SetColumn
903 
904 /***********************************************************************/
905 /*  SetColumn: Compare column value with source column value.          */
906 /***********************************************************************/
CompareLast(void)907 bool SRCCOL::CompareLast(void)
908   {
909   // Compare the unconverted values
910   return !Value->IsEqual(To_Val, true);
911   } // end of CompareColumn
912 
913 /* --------------------- End of TabPivot/TabQrs ---------------------- */
914