1 /*
2  * psql - the PostgreSQL interactive terminal
3  *
4  * Copyright (c) 2000-2021, PostgreSQL Global Development Group
5  *
6  * src/bin/psql/crosstabview.c
7  */
8 #include "postgres_fe.h"
9 
10 #include "common.h"
11 #include "common/logging.h"
12 #include "crosstabview.h"
13 #include "pqexpbuffer.h"
14 #include "psqlscanslash.h"
15 #include "settings.h"
16 
17 /*
18  * Value/position from the resultset that goes into the horizontal or vertical
19  * crosstabview header.
20  */
21 typedef struct _pivot_field
22 {
23 	/*
24 	 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
25 	 * value becomes an entry in the vertical header (colV), or horizontal
26 	 * header (colH). A Null value is represented by a NULL pointer.
27 	 */
28 	char	   *name;
29 
30 	/*
31 	 * When a sort is requested on an alternative column, this holds
32 	 * PQgetvalue() for the sort column corresponding to <name>. If <name>
33 	 * appear multiple times, it's the first value in the order of the results
34 	 * that is kept. A Null value is represented by a NULL pointer.
35 	 */
36 	char	   *sort_value;
37 
38 	/*
39 	 * Rank of this value, starting at 0. Initially, it's the relative
40 	 * position of the first appearance of <name> in the resultset. For
41 	 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
42 	 * When a sort column is specified, ranks get updated in a final pass to
43 	 * reflect the desired order.
44 	 */
45 	int			rank;
46 } pivot_field;
47 
48 /* Node in avl_tree */
49 typedef struct _avl_node
50 {
51 	/* Node contents */
52 	pivot_field field;
53 
54 	/*
55 	 * Height of this node in the tree (number of nodes on the longest path to
56 	 * a leaf).
57 	 */
58 	int			height;
59 
60 	/*
61 	 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
62 	 * NULL, points to the empty node avl_tree.end when no left or right
63 	 * value.
64 	 */
65 	struct _avl_node *children[2];
66 } avl_node;
67 
68 /*
69  * Control structure for the AVL tree (binary search tree kept
70  * balanced with the AVL algorithm)
71  */
72 typedef struct _avl_tree
73 {
74 	int			count;			/* Total number of nodes */
75 	avl_node   *root;			/* root of the tree */
76 	avl_node   *end;			/* Immutable dereferenceable empty tree */
77 } avl_tree;
78 
79 
80 static bool printCrosstab(const PGresult *results,
81 						  int num_columns, pivot_field *piv_columns, int field_for_columns,
82 						  int num_rows, pivot_field *piv_rows, int field_for_rows,
83 						  int field_for_data);
84 static void avlInit(avl_tree *tree);
85 static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
86 static int	avlCollectFields(avl_tree *tree, avl_node *node,
87 							 pivot_field *fields, int idx);
88 static void avlFree(avl_tree *tree, avl_node *node);
89 static void rankSort(int num_columns, pivot_field *piv_columns);
90 static int	indexOfColumn(char *arg, const PGresult *res);
91 static int	pivotFieldCompare(const void *a, const void *b);
92 static int	rankCompare(const void *a, const void *b);
93 
94 
95 /*
96  * Main entry point to this module.
97  *
98  * Process the data from *res according to the options in pset (global),
99  * to generate the horizontal and vertical headers contents,
100  * then call printCrosstab() for the actual output.
101  */
102 bool
PrintResultsInCrosstab(const PGresult * res)103 PrintResultsInCrosstab(const PGresult *res)
104 {
105 	bool		retval = false;
106 	avl_tree	piv_columns;
107 	avl_tree	piv_rows;
108 	pivot_field *array_columns = NULL;
109 	pivot_field *array_rows = NULL;
110 	int			num_columns = 0;
111 	int			num_rows = 0;
112 	int			field_for_rows;
113 	int			field_for_columns;
114 	int			field_for_data;
115 	int			sort_field_for_columns;
116 	int			rn;
117 
118 	avlInit(&piv_rows);
119 	avlInit(&piv_columns);
120 
121 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
122 	{
123 		pg_log_error("\\crosstabview: statement did not return a result set");
124 		goto error_return;
125 	}
126 
127 	if (PQnfields(res) < 3)
128 	{
129 		pg_log_error("\\crosstabview: query must return at least three columns");
130 		goto error_return;
131 	}
132 
133 	/* Process first optional arg (vertical header column) */
134 	if (pset.ctv_args[0] == NULL)
135 		field_for_rows = 0;
136 	else
137 	{
138 		field_for_rows = indexOfColumn(pset.ctv_args[0], res);
139 		if (field_for_rows < 0)
140 			goto error_return;
141 	}
142 
143 	/* Process second optional arg (horizontal header column) */
144 	if (pset.ctv_args[1] == NULL)
145 		field_for_columns = 1;
146 	else
147 	{
148 		field_for_columns = indexOfColumn(pset.ctv_args[1], res);
149 		if (field_for_columns < 0)
150 			goto error_return;
151 	}
152 
153 	/* Insist that header columns be distinct */
154 	if (field_for_columns == field_for_rows)
155 	{
156 		pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
157 		goto error_return;
158 	}
159 
160 	/* Process third optional arg (data column) */
161 	if (pset.ctv_args[2] == NULL)
162 	{
163 		int			i;
164 
165 		/*
166 		 * If the data column was not specified, we search for the one not
167 		 * used as either vertical or horizontal headers.  Must be exactly
168 		 * three columns, or this won't be unique.
169 		 */
170 		if (PQnfields(res) != 3)
171 		{
172 			pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
173 			goto error_return;
174 		}
175 
176 		field_for_data = -1;
177 		for (i = 0; i < PQnfields(res); i++)
178 		{
179 			if (i != field_for_rows && i != field_for_columns)
180 			{
181 				field_for_data = i;
182 				break;
183 			}
184 		}
185 		Assert(field_for_data >= 0);
186 	}
187 	else
188 	{
189 		field_for_data = indexOfColumn(pset.ctv_args[2], res);
190 		if (field_for_data < 0)
191 			goto error_return;
192 	}
193 
194 	/* Process fourth optional arg (horizontal header sort column) */
195 	if (pset.ctv_args[3] == NULL)
196 		sort_field_for_columns = -1;	/* no sort column */
197 	else
198 	{
199 		sort_field_for_columns = indexOfColumn(pset.ctv_args[3], res);
200 		if (sort_field_for_columns < 0)
201 			goto error_return;
202 	}
203 
204 	/*
205 	 * First part: accumulate the names that go into the vertical and
206 	 * horizontal headers, each into an AVL binary tree to build the set of
207 	 * DISTINCT values.
208 	 */
209 
210 	for (rn = 0; rn < PQntuples(res); rn++)
211 	{
212 		char	   *val;
213 		char	   *val1;
214 
215 		/* horizontal */
216 		val = PQgetisnull(res, rn, field_for_columns) ? NULL :
217 			PQgetvalue(res, rn, field_for_columns);
218 		val1 = NULL;
219 
220 		if (sort_field_for_columns >= 0 &&
221 			!PQgetisnull(res, rn, sort_field_for_columns))
222 			val1 = PQgetvalue(res, rn, sort_field_for_columns);
223 
224 		avlMergeValue(&piv_columns, val, val1);
225 
226 		if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
227 		{
228 			pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
229 						 CROSSTABVIEW_MAX_COLUMNS);
230 			goto error_return;
231 		}
232 
233 		/* vertical */
234 		val = PQgetisnull(res, rn, field_for_rows) ? NULL :
235 			PQgetvalue(res, rn, field_for_rows);
236 
237 		avlMergeValue(&piv_rows, val, NULL);
238 	}
239 
240 	/*
241 	 * Second part: Generate sorted arrays from the AVL trees.
242 	 */
243 
244 	num_columns = piv_columns.count;
245 	num_rows = piv_rows.count;
246 
247 	array_columns = (pivot_field *)
248 		pg_malloc(sizeof(pivot_field) * num_columns);
249 
250 	array_rows = (pivot_field *)
251 		pg_malloc(sizeof(pivot_field) * num_rows);
252 
253 	avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0);
254 	avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0);
255 
256 	/*
257 	 * Third part: optionally, process the ranking data for the horizontal
258 	 * header
259 	 */
260 	if (sort_field_for_columns >= 0)
261 		rankSort(num_columns, array_columns);
262 
263 	/*
264 	 * Fourth part: print the crosstab'ed results.
265 	 */
266 	retval = printCrosstab(res,
267 						   num_columns, array_columns, field_for_columns,
268 						   num_rows, array_rows, field_for_rows,
269 						   field_for_data);
270 
271 error_return:
272 	avlFree(&piv_columns, piv_columns.root);
273 	avlFree(&piv_rows, piv_rows.root);
274 	pg_free(array_columns);
275 	pg_free(array_rows);
276 
277 	return retval;
278 }
279 
280 /*
281  * Output the pivoted resultset with the printTable* functions.  Return true
282  * if successful, false otherwise.
283  */
284 static bool
printCrosstab(const PGresult * results,int num_columns,pivot_field * piv_columns,int field_for_columns,int num_rows,pivot_field * piv_rows,int field_for_rows,int field_for_data)285 printCrosstab(const PGresult *results,
286 			  int num_columns, pivot_field *piv_columns, int field_for_columns,
287 			  int num_rows, pivot_field *piv_rows, int field_for_rows,
288 			  int field_for_data)
289 {
290 	printQueryOpt popt = pset.popt;
291 	printTableContent cont;
292 	int			i,
293 				rn;
294 	char		col_align;
295 	int		   *horiz_map;
296 	bool		retval = false;
297 
298 	printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
299 
300 	/* Step 1: set target column names (horizontal header) */
301 
302 	/* The name of the first column is kept unchanged by the pivoting */
303 	printTableAddHeader(&cont,
304 						PQfname(results, field_for_rows),
305 						false,
306 						column_type_alignment(PQftype(results,
307 													  field_for_rows)));
308 
309 	/*
310 	 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
311 	 * map associating each piv_columns[].rank to its index in piv_columns.
312 	 * This avoids an O(N^2) loop later.
313 	 */
314 	horiz_map = (int *) pg_malloc(sizeof(int) * num_columns);
315 	for (i = 0; i < num_columns; i++)
316 		horiz_map[piv_columns[i].rank] = i;
317 
318 	/*
319 	 * The display alignment depends on its PQftype().
320 	 */
321 	col_align = column_type_alignment(PQftype(results, field_for_data));
322 
323 	for (i = 0; i < num_columns; i++)
324 	{
325 		char	   *colname;
326 
327 		colname = piv_columns[horiz_map[i]].name ?
328 			piv_columns[horiz_map[i]].name :
329 			(popt.nullPrint ? popt.nullPrint : "");
330 
331 		printTableAddHeader(&cont, colname, false, col_align);
332 	}
333 	pg_free(horiz_map);
334 
335 	/* Step 2: set row names in the first output column (vertical header) */
336 	for (i = 0; i < num_rows; i++)
337 	{
338 		int			k = piv_rows[i].rank;
339 
340 		cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
341 			piv_rows[i].name :
342 			(popt.nullPrint ? popt.nullPrint : "");
343 	}
344 	cont.cellsadded = num_rows * (num_columns + 1);
345 
346 	/*
347 	 * Step 3: fill in the content cells.
348 	 */
349 	for (rn = 0; rn < PQntuples(results); rn++)
350 	{
351 		int			row_number;
352 		int			col_number;
353 		pivot_field *rp,
354 				   *cp;
355 		pivot_field elt;
356 
357 		/* Find target row */
358 		if (!PQgetisnull(results, rn, field_for_rows))
359 			elt.name = PQgetvalue(results, rn, field_for_rows);
360 		else
361 			elt.name = NULL;
362 		rp = (pivot_field *) bsearch(&elt,
363 									 piv_rows,
364 									 num_rows,
365 									 sizeof(pivot_field),
366 									 pivotFieldCompare);
367 		Assert(rp != NULL);
368 		row_number = rp->rank;
369 
370 		/* Find target column */
371 		if (!PQgetisnull(results, rn, field_for_columns))
372 			elt.name = PQgetvalue(results, rn, field_for_columns);
373 		else
374 			elt.name = NULL;
375 
376 		cp = (pivot_field *) bsearch(&elt,
377 									 piv_columns,
378 									 num_columns,
379 									 sizeof(pivot_field),
380 									 pivotFieldCompare);
381 		Assert(cp != NULL);
382 		col_number = cp->rank;
383 
384 		/* Place value into cell */
385 		if (col_number >= 0 && row_number >= 0)
386 		{
387 			int			idx;
388 
389 			/* index into the cont.cells array */
390 			idx = 1 + col_number + row_number * (num_columns + 1);
391 
392 			/*
393 			 * If the cell already contains a value, raise an error.
394 			 */
395 			if (cont.cells[idx] != NULL)
396 			{
397 				pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
398 							 rp->name ? rp->name :
399 							 (popt.nullPrint ? popt.nullPrint : "(null)"),
400 							 cp->name ? cp->name :
401 							 (popt.nullPrint ? popt.nullPrint : "(null)"));
402 				goto error;
403 			}
404 
405 			cont.cells[idx] = !PQgetisnull(results, rn, field_for_data) ?
406 				PQgetvalue(results, rn, field_for_data) :
407 				(popt.nullPrint ? popt.nullPrint : "");
408 		}
409 	}
410 
411 	/*
412 	 * The non-initialized cells must be set to an empty string for the print
413 	 * functions
414 	 */
415 	for (i = 0; i < cont.cellsadded; i++)
416 	{
417 		if (cont.cells[i] == NULL)
418 			cont.cells[i] = "";
419 	}
420 
421 	printTable(&cont, pset.queryFout, false, pset.logfile);
422 	retval = true;
423 
424 error:
425 	printTableCleanup(&cont);
426 
427 	return retval;
428 }
429 
430 /*
431  * The avl* functions below provide a minimalistic implementation of AVL binary
432  * trees, to efficiently collect the distinct values that will form the horizontal
433  * and vertical headers. It only supports adding new values, no removal or even
434  * search.
435  */
436 static void
avlInit(avl_tree * tree)437 avlInit(avl_tree *tree)
438 {
439 	tree->end = (avl_node *) pg_malloc0(sizeof(avl_node));
440 	tree->end->children[0] = tree->end->children[1] = tree->end;
441 	tree->count = 0;
442 	tree->root = tree->end;
443 }
444 
445 /* Deallocate recursively an AVL tree, starting from node */
446 static void
avlFree(avl_tree * tree,avl_node * node)447 avlFree(avl_tree *tree, avl_node *node)
448 {
449 	if (node->children[0] != tree->end)
450 	{
451 		avlFree(tree, node->children[0]);
452 		pg_free(node->children[0]);
453 	}
454 	if (node->children[1] != tree->end)
455 	{
456 		avlFree(tree, node->children[1]);
457 		pg_free(node->children[1]);
458 	}
459 	if (node == tree->root)
460 	{
461 		/* free the root separately as it's not child of anything */
462 		if (node != tree->end)
463 			pg_free(node);
464 		/* free the tree->end struct only once and when all else is freed */
465 		pg_free(tree->end);
466 	}
467 }
468 
469 /* Set the height to 1 plus the greatest of left and right heights */
470 static void
avlUpdateHeight(avl_node * n)471 avlUpdateHeight(avl_node *n)
472 {
473 	n->height = 1 + (n->children[0]->height > n->children[1]->height ?
474 					 n->children[0]->height :
475 					 n->children[1]->height);
476 }
477 
478 /* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
479 static avl_node *
avlRotate(avl_node ** current,int dir)480 avlRotate(avl_node **current, int dir)
481 {
482 	avl_node   *before = *current;
483 	avl_node   *after = (*current)->children[dir];
484 
485 	*current = after;
486 	before->children[dir] = after->children[!dir];
487 	avlUpdateHeight(before);
488 	after->children[!dir] = before;
489 
490 	return after;
491 }
492 
493 static int
avlBalance(avl_node * n)494 avlBalance(avl_node *n)
495 {
496 	return n->children[0]->height - n->children[1]->height;
497 }
498 
499 /*
500  * After an insertion, possibly rebalance the tree so that the left and right
501  * node heights don't differ by more than 1.
502  * May update *node.
503  */
504 static void
avlAdjustBalance(avl_tree * tree,avl_node ** node)505 avlAdjustBalance(avl_tree *tree, avl_node **node)
506 {
507 	avl_node   *current = *node;
508 	int			b = avlBalance(current) / 2;
509 
510 	if (b != 0)
511 	{
512 		int			dir = (1 - b) / 2;
513 
514 		if (avlBalance(current->children[dir]) == -b)
515 			avlRotate(&current->children[dir], !dir);
516 		current = avlRotate(node, dir);
517 	}
518 	if (current != tree->end)
519 		avlUpdateHeight(current);
520 }
521 
522 /*
523  * Insert a new value/field, starting from *node, reaching the correct position
524  * in the tree by recursion.  Possibly rebalance the tree and possibly update
525  * *node.  Do nothing if the value is already present in the tree.
526  */
527 static void
avlInsertNode(avl_tree * tree,avl_node ** node,pivot_field field)528 avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
529 {
530 	avl_node   *current = *node;
531 
532 	if (current == tree->end)
533 	{
534 		avl_node   *new_node = (avl_node *)
535 		pg_malloc(sizeof(avl_node));
536 
537 		new_node->height = 1;
538 		new_node->field = field;
539 		new_node->children[0] = new_node->children[1] = tree->end;
540 		tree->count++;
541 		*node = new_node;
542 	}
543 	else
544 	{
545 		int			cmp = pivotFieldCompare(&field, &current->field);
546 
547 		if (cmp != 0)
548 		{
549 			avlInsertNode(tree,
550 						  cmp > 0 ? &current->children[1] : &current->children[0],
551 						  field);
552 			avlAdjustBalance(tree, node);
553 		}
554 	}
555 }
556 
557 /* Insert the value into the AVL tree, if it does not preexist */
558 static void
avlMergeValue(avl_tree * tree,char * name,char * sort_value)559 avlMergeValue(avl_tree *tree, char *name, char *sort_value)
560 {
561 	pivot_field field;
562 
563 	field.name = name;
564 	field.rank = tree->count;
565 	field.sort_value = sort_value;
566 	avlInsertNode(tree, &tree->root, field);
567 }
568 
569 /*
570  * Recursively extract node values into the names array, in sorted order with a
571  * left-to-right tree traversal.
572  * Return the next candidate offset to write into the names array.
573  * fields[] must be preallocated to hold tree->count entries
574  */
575 static int
avlCollectFields(avl_tree * tree,avl_node * node,pivot_field * fields,int idx)576 avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
577 {
578 	if (node == tree->end)
579 		return idx;
580 
581 	idx = avlCollectFields(tree, node->children[0], fields, idx);
582 	fields[idx] = node->field;
583 	return avlCollectFields(tree, node->children[1], fields, idx + 1);
584 }
585 
586 static void
rankSort(int num_columns,pivot_field * piv_columns)587 rankSort(int num_columns, pivot_field *piv_columns)
588 {
589 	int		   *hmap;			/* [[offset in piv_columns, rank], ...for
590 								 * every header entry] */
591 	int			i;
592 
593 	hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2);
594 	for (i = 0; i < num_columns; i++)
595 	{
596 		char	   *val = piv_columns[i].sort_value;
597 
598 		/* ranking information is valid if non null and matches /^-?\d+$/ */
599 		if (val &&
600 			((*val == '-' &&
601 			  strspn(val + 1, "0123456789") == strlen(val + 1)) ||
602 			 strspn(val, "0123456789") == strlen(val)))
603 		{
604 			hmap[i * 2] = atoi(val);
605 			hmap[i * 2 + 1] = i;
606 		}
607 		else
608 		{
609 			/* invalid rank information ignored (equivalent to rank 0) */
610 			hmap[i * 2] = 0;
611 			hmap[i * 2 + 1] = i;
612 		}
613 	}
614 
615 	qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
616 
617 	for (i = 0; i < num_columns; i++)
618 	{
619 		piv_columns[hmap[i * 2 + 1]].rank = i;
620 	}
621 
622 	pg_free(hmap);
623 }
624 
625 /*
626  * Look up a column reference, which can be either:
627  * - a number from 1 to PQnfields(res)
628  * - a column name matching one of PQfname(res,...)
629  *
630  * Returns zero-based column number, or -1 if not found or ambiguous.
631  *
632  * Note: may modify contents of "arg" string.
633  */
634 static int
indexOfColumn(char * arg,const PGresult * res)635 indexOfColumn(char *arg, const PGresult *res)
636 {
637 	int			idx;
638 
639 	if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
640 	{
641 		/* if arg contains only digits, it's a column number */
642 		idx = atoi(arg) - 1;
643 		if (idx < 0 || idx >= PQnfields(res))
644 		{
645 			pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
646 						 idx + 1, PQnfields(res));
647 			return -1;
648 		}
649 	}
650 	else
651 	{
652 		int			i;
653 
654 		/*
655 		 * Dequote and downcase the column name.  By checking for all-digits
656 		 * before doing this, we can ensure that a quoted name is treated as a
657 		 * name even if it's all digits.
658 		 */
659 		dequote_downcase_identifier(arg, true, pset.encoding);
660 
661 		/* Now look for match(es) among res' column names */
662 		idx = -1;
663 		for (i = 0; i < PQnfields(res); i++)
664 		{
665 			if (strcmp(arg, PQfname(res, i)) == 0)
666 			{
667 				if (idx >= 0)
668 				{
669 					/* another idx was already found for the same name */
670 					pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg);
671 					return -1;
672 				}
673 				idx = i;
674 			}
675 		}
676 		if (idx == -1)
677 		{
678 			pg_log_error("\\crosstabview: column name not found: \"%s\"", arg);
679 			return -1;
680 		}
681 	}
682 
683 	return idx;
684 }
685 
686 /*
687  * Value comparator for vertical and horizontal headers
688  * used for deduplication only.
689  * - null values are considered equal
690  * - non-null < null
691  * - non-null values are compared with strcmp()
692  */
693 static int
pivotFieldCompare(const void * a,const void * b)694 pivotFieldCompare(const void *a, const void *b)
695 {
696 	const pivot_field *pa = (const pivot_field *) a;
697 	const pivot_field *pb = (const pivot_field *) b;
698 
699 	/* test null values */
700 	if (!pb->name)
701 		return pa->name ? -1 : 0;
702 	else if (!pa->name)
703 		return 1;
704 
705 	/* non-null values */
706 	return strcmp(pa->name, pb->name);
707 }
708 
709 static int
rankCompare(const void * a,const void * b)710 rankCompare(const void *a, const void *b)
711 {
712 	return *((const int *) a) - *((const int *) b);
713 }
714