1 /*
2  * psql - the PostgreSQL interactive terminal
3  *
4  * Copyright (c) 2000-2019, PostgreSQL Global Development Group
5  *
6  * src/bin/psql/crosstabview.c
7  */
8 #include "postgres_fe.h"
9 
10 #include "common.h"
11 #include "crosstabview.h"
12 #include "pqexpbuffer.h"
13 #include "psqlscanslash.h"
14 #include "settings.h"
15 
16 #include "common/logging.h"
17 
18 /*
19  * Value/position from the resultset that goes into the horizontal or vertical
20  * crosstabview header.
21  */
22 typedef struct _pivot_field
23 {
24 	/*
25 	 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
26 	 * value becomes an entry in the vertical header (colV), or horizontal
27 	 * header (colH). A Null value is represented by a NULL pointer.
28 	 */
29 	char	   *name;
30 
31 	/*
32 	 * When a sort is requested on an alternative column, this holds
33 	 * PQgetvalue() for the sort column corresponding to <name>. If <name>
34 	 * appear multiple times, it's the first value in the order of the results
35 	 * that is kept. A Null value is represented by a NULL pointer.
36 	 */
37 	char	   *sort_value;
38 
39 	/*
40 	 * Rank of this value, starting at 0. Initially, it's the relative
41 	 * position of the first appearance of <name> in the resultset. For
42 	 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
43 	 * When a sort column is specified, ranks get updated in a final pass to
44 	 * reflect the desired order.
45 	 */
46 	int			rank;
47 } pivot_field;
48 
49 /* Node in avl_tree */
50 typedef struct _avl_node
51 {
52 	/* Node contents */
53 	pivot_field field;
54 
55 	/*
56 	 * Height of this node in the tree (number of nodes on the longest path to
57 	 * a leaf).
58 	 */
59 	int			height;
60 
61 	/*
62 	 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
63 	 * NULL, points to the empty node avl_tree.end when no left or right
64 	 * value.
65 	 */
66 	struct _avl_node *children[2];
67 } avl_node;
68 
69 /*
70  * Control structure for the AVL tree (binary search tree kept
71  * balanced with the AVL algorithm)
72  */
73 typedef struct _avl_tree
74 {
75 	int			count;			/* Total number of nodes */
76 	avl_node   *root;			/* root of the tree */
77 	avl_node   *end;			/* Immutable dereferenceable empty tree */
78 } avl_tree;
79 
80 
81 static bool printCrosstab(const PGresult *results,
82 						  int num_columns, pivot_field *piv_columns, int field_for_columns,
83 						  int num_rows, pivot_field *piv_rows, int field_for_rows,
84 						  int field_for_data);
85 static void avlInit(avl_tree *tree);
86 static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
87 static int	avlCollectFields(avl_tree *tree, avl_node *node,
88 							 pivot_field *fields, int idx);
89 static void avlFree(avl_tree *tree, avl_node *node);
90 static void rankSort(int num_columns, pivot_field *piv_columns);
91 static int	indexOfColumn(char *arg, const PGresult *res);
92 static int	pivotFieldCompare(const void *a, const void *b);
93 static int	rankCompare(const void *a, const void *b);
94 
95 
96 /*
97  * Main entry point to this module.
98  *
99  * Process the data from *res according to the options in pset (global),
100  * to generate the horizontal and vertical headers contents,
101  * then call printCrosstab() for the actual output.
102  */
103 bool
PrintResultsInCrosstab(const PGresult * res)104 PrintResultsInCrosstab(const PGresult *res)
105 {
106 	bool		retval = false;
107 	avl_tree	piv_columns;
108 	avl_tree	piv_rows;
109 	pivot_field *array_columns = NULL;
110 	pivot_field *array_rows = NULL;
111 	int			num_columns = 0;
112 	int			num_rows = 0;
113 	int			field_for_rows;
114 	int			field_for_columns;
115 	int			field_for_data;
116 	int			sort_field_for_columns;
117 	int			rn;
118 
119 	avlInit(&piv_rows);
120 	avlInit(&piv_columns);
121 
122 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
123 	{
124 		pg_log_error("\\crosstabview: statement did not return a result set");
125 		goto error_return;
126 	}
127 
128 	if (PQnfields(res) < 3)
129 	{
130 		pg_log_error("\\crosstabview: query must return at least three columns");
131 		goto error_return;
132 	}
133 
134 	/* Process first optional arg (vertical header column) */
135 	if (pset.ctv_args[0] == NULL)
136 		field_for_rows = 0;
137 	else
138 	{
139 		field_for_rows = indexOfColumn(pset.ctv_args[0], res);
140 		if (field_for_rows < 0)
141 			goto error_return;
142 	}
143 
144 	/* Process second optional arg (horizontal header column) */
145 	if (pset.ctv_args[1] == NULL)
146 		field_for_columns = 1;
147 	else
148 	{
149 		field_for_columns = indexOfColumn(pset.ctv_args[1], res);
150 		if (field_for_columns < 0)
151 			goto error_return;
152 	}
153 
154 	/* Insist that header columns be distinct */
155 	if (field_for_columns == field_for_rows)
156 	{
157 		pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
158 		goto error_return;
159 	}
160 
161 	/* Process third optional arg (data column) */
162 	if (pset.ctv_args[2] == NULL)
163 	{
164 		int			i;
165 
166 		/*
167 		 * If the data column was not specified, we search for the one not
168 		 * used as either vertical or horizontal headers.  Must be exactly
169 		 * three columns, or this won't be unique.
170 		 */
171 		if (PQnfields(res) != 3)
172 		{
173 			pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
174 			goto error_return;
175 		}
176 
177 		field_for_data = -1;
178 		for (i = 0; i < PQnfields(res); i++)
179 		{
180 			if (i != field_for_rows && i != field_for_columns)
181 			{
182 				field_for_data = i;
183 				break;
184 			}
185 		}
186 		Assert(field_for_data >= 0);
187 	}
188 	else
189 	{
190 		field_for_data = indexOfColumn(pset.ctv_args[2], res);
191 		if (field_for_data < 0)
192 			goto error_return;
193 	}
194 
195 	/* Process fourth optional arg (horizontal header sort column) */
196 	if (pset.ctv_args[3] == NULL)
197 		sort_field_for_columns = -1;	/* no sort column */
198 	else
199 	{
200 		sort_field_for_columns = indexOfColumn(pset.ctv_args[3], res);
201 		if (sort_field_for_columns < 0)
202 			goto error_return;
203 	}
204 
205 	/*
206 	 * First part: accumulate the names that go into the vertical and
207 	 * horizontal headers, each into an AVL binary tree to build the set of
208 	 * DISTINCT values.
209 	 */
210 
211 	for (rn = 0; rn < PQntuples(res); rn++)
212 	{
213 		char	   *val;
214 		char	   *val1;
215 
216 		/* horizontal */
217 		val = PQgetisnull(res, rn, field_for_columns) ? NULL :
218 			PQgetvalue(res, rn, field_for_columns);
219 		val1 = NULL;
220 
221 		if (sort_field_for_columns >= 0 &&
222 			!PQgetisnull(res, rn, sort_field_for_columns))
223 			val1 = PQgetvalue(res, rn, sort_field_for_columns);
224 
225 		avlMergeValue(&piv_columns, val, val1);
226 
227 		if (piv_columns.count > CROSSTABVIEW_MAX_COLUMNS)
228 		{
229 			pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
230 						 CROSSTABVIEW_MAX_COLUMNS);
231 			goto error_return;
232 		}
233 
234 		/* vertical */
235 		val = PQgetisnull(res, rn, field_for_rows) ? NULL :
236 			PQgetvalue(res, rn, field_for_rows);
237 
238 		avlMergeValue(&piv_rows, val, NULL);
239 	}
240 
241 	/*
242 	 * Second part: Generate sorted arrays from the AVL trees.
243 	 */
244 
245 	num_columns = piv_columns.count;
246 	num_rows = piv_rows.count;
247 
248 	array_columns = (pivot_field *)
249 		pg_malloc(sizeof(pivot_field) * num_columns);
250 
251 	array_rows = (pivot_field *)
252 		pg_malloc(sizeof(pivot_field) * num_rows);
253 
254 	avlCollectFields(&piv_columns, piv_columns.root, array_columns, 0);
255 	avlCollectFields(&piv_rows, piv_rows.root, array_rows, 0);
256 
257 	/*
258 	 * Third part: optionally, process the ranking data for the horizontal
259 	 * header
260 	 */
261 	if (sort_field_for_columns >= 0)
262 		rankSort(num_columns, array_columns);
263 
264 	/*
265 	 * Fourth part: print the crosstab'ed results.
266 	 */
267 	retval = printCrosstab(res,
268 						   num_columns, array_columns, field_for_columns,
269 						   num_rows, array_rows, field_for_rows,
270 						   field_for_data);
271 
272 error_return:
273 	avlFree(&piv_columns, piv_columns.root);
274 	avlFree(&piv_rows, piv_rows.root);
275 	pg_free(array_columns);
276 	pg_free(array_rows);
277 
278 	return retval;
279 }
280 
281 /*
282  * Output the pivoted resultset with the printTable* functions.  Return true
283  * if successful, false otherwise.
284  */
285 static bool
printCrosstab(const PGresult * results,int num_columns,pivot_field * piv_columns,int field_for_columns,int num_rows,pivot_field * piv_rows,int field_for_rows,int field_for_data)286 printCrosstab(const PGresult *results,
287 			  int num_columns, pivot_field *piv_columns, int field_for_columns,
288 			  int num_rows, pivot_field *piv_rows, int field_for_rows,
289 			  int field_for_data)
290 {
291 	printQueryOpt popt = pset.popt;
292 	printTableContent cont;
293 	int			i,
294 				rn;
295 	char		col_align;
296 	int		   *horiz_map;
297 	bool		retval = false;
298 
299 	printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
300 
301 	/* Step 1: set target column names (horizontal header) */
302 
303 	/* The name of the first column is kept unchanged by the pivoting */
304 	printTableAddHeader(&cont,
305 						PQfname(results, field_for_rows),
306 						false,
307 						column_type_alignment(PQftype(results,
308 													  field_for_rows)));
309 
310 	/*
311 	 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
312 	 * map associating each piv_columns[].rank to its index in piv_columns.
313 	 * This avoids an O(N^2) loop later.
314 	 */
315 	horiz_map = (int *) pg_malloc(sizeof(int) * num_columns);
316 	for (i = 0; i < num_columns; i++)
317 		horiz_map[piv_columns[i].rank] = i;
318 
319 	/*
320 	 * The display alignment depends on its PQftype().
321 	 */
322 	col_align = column_type_alignment(PQftype(results, field_for_data));
323 
324 	for (i = 0; i < num_columns; i++)
325 	{
326 		char	   *colname;
327 
328 		colname = piv_columns[horiz_map[i]].name ?
329 			piv_columns[horiz_map[i]].name :
330 			(popt.nullPrint ? popt.nullPrint : "");
331 
332 		printTableAddHeader(&cont, colname, false, col_align);
333 	}
334 	pg_free(horiz_map);
335 
336 	/* Step 2: set row names in the first output column (vertical header) */
337 	for (i = 0; i < num_rows; i++)
338 	{
339 		int			k = piv_rows[i].rank;
340 
341 		cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
342 			piv_rows[i].name :
343 			(popt.nullPrint ? popt.nullPrint : "");
344 	}
345 	cont.cellsadded = num_rows * (num_columns + 1);
346 
347 	/*
348 	 * Step 3: fill in the content cells.
349 	 */
350 	for (rn = 0; rn < PQntuples(results); rn++)
351 	{
352 		int			row_number;
353 		int			col_number;
354 		pivot_field *rp,
355 				   *cp;
356 		pivot_field elt;
357 
358 		/* Find target row */
359 		if (!PQgetisnull(results, rn, field_for_rows))
360 			elt.name = PQgetvalue(results, rn, field_for_rows);
361 		else
362 			elt.name = NULL;
363 		rp = (pivot_field *) bsearch(&elt,
364 									 piv_rows,
365 									 num_rows,
366 									 sizeof(pivot_field),
367 									 pivotFieldCompare);
368 		Assert(rp != NULL);
369 		row_number = rp->rank;
370 
371 		/* Find target column */
372 		if (!PQgetisnull(results, rn, field_for_columns))
373 			elt.name = PQgetvalue(results, rn, field_for_columns);
374 		else
375 			elt.name = NULL;
376 
377 		cp = (pivot_field *) bsearch(&elt,
378 									 piv_columns,
379 									 num_columns,
380 									 sizeof(pivot_field),
381 									 pivotFieldCompare);
382 		Assert(cp != NULL);
383 		col_number = cp->rank;
384 
385 		/* Place value into cell */
386 		if (col_number >= 0 && row_number >= 0)
387 		{
388 			int			idx;
389 
390 			/* index into the cont.cells array */
391 			idx = 1 + col_number + row_number * (num_columns + 1);
392 
393 			/*
394 			 * If the cell already contains a value, raise an error.
395 			 */
396 			if (cont.cells[idx] != NULL)
397 			{
398 				pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
399 							 rp->name ? rp->name :
400 							 (popt.nullPrint ? popt.nullPrint : "(null)"),
401 							 cp->name ? cp->name :
402 							 (popt.nullPrint ? popt.nullPrint : "(null)"));
403 				goto error;
404 			}
405 
406 			cont.cells[idx] = !PQgetisnull(results, rn, field_for_data) ?
407 				PQgetvalue(results, rn, field_for_data) :
408 				(popt.nullPrint ? popt.nullPrint : "");
409 		}
410 	}
411 
412 	/*
413 	 * The non-initialized cells must be set to an empty string for the print
414 	 * functions
415 	 */
416 	for (i = 0; i < cont.cellsadded; i++)
417 	{
418 		if (cont.cells[i] == NULL)
419 			cont.cells[i] = "";
420 	}
421 
422 	printTable(&cont, pset.queryFout, false, pset.logfile);
423 	retval = true;
424 
425 error:
426 	printTableCleanup(&cont);
427 
428 	return retval;
429 }
430 
431 /*
432  * The avl* functions below provide a minimalistic implementation of AVL binary
433  * trees, to efficiently collect the distinct values that will form the horizontal
434  * and vertical headers. It only supports adding new values, no removal or even
435  * search.
436  */
437 static void
avlInit(avl_tree * tree)438 avlInit(avl_tree *tree)
439 {
440 	tree->end = (avl_node *) pg_malloc0(sizeof(avl_node));
441 	tree->end->children[0] = tree->end->children[1] = tree->end;
442 	tree->count = 0;
443 	tree->root = tree->end;
444 }
445 
446 /* Deallocate recursively an AVL tree, starting from node */
447 static void
avlFree(avl_tree * tree,avl_node * node)448 avlFree(avl_tree *tree, avl_node *node)
449 {
450 	if (node->children[0] != tree->end)
451 	{
452 		avlFree(tree, node->children[0]);
453 		pg_free(node->children[0]);
454 	}
455 	if (node->children[1] != tree->end)
456 	{
457 		avlFree(tree, node->children[1]);
458 		pg_free(node->children[1]);
459 	}
460 	if (node == tree->root)
461 	{
462 		/* free the root separately as it's not child of anything */
463 		if (node != tree->end)
464 			pg_free(node);
465 		/* free the tree->end struct only once and when all else is freed */
466 		pg_free(tree->end);
467 	}
468 }
469 
470 /* Set the height to 1 plus the greatest of left and right heights */
471 static void
avlUpdateHeight(avl_node * n)472 avlUpdateHeight(avl_node *n)
473 {
474 	n->height = 1 + (n->children[0]->height > n->children[1]->height ?
475 					 n->children[0]->height :
476 					 n->children[1]->height);
477 }
478 
479 /* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
480 static avl_node *
avlRotate(avl_node ** current,int dir)481 avlRotate(avl_node **current, int dir)
482 {
483 	avl_node   *before = *current;
484 	avl_node   *after = (*current)->children[dir];
485 
486 	*current = after;
487 	before->children[dir] = after->children[!dir];
488 	avlUpdateHeight(before);
489 	after->children[!dir] = before;
490 
491 	return after;
492 }
493 
494 static int
avlBalance(avl_node * n)495 avlBalance(avl_node *n)
496 {
497 	return n->children[0]->height - n->children[1]->height;
498 }
499 
500 /*
501  * After an insertion, possibly rebalance the tree so that the left and right
502  * node heights don't differ by more than 1.
503  * May update *node.
504  */
505 static void
avlAdjustBalance(avl_tree * tree,avl_node ** node)506 avlAdjustBalance(avl_tree *tree, avl_node **node)
507 {
508 	avl_node   *current = *node;
509 	int			b = avlBalance(current) / 2;
510 
511 	if (b != 0)
512 	{
513 		int			dir = (1 - b) / 2;
514 
515 		if (avlBalance(current->children[dir]) == -b)
516 			avlRotate(&current->children[dir], !dir);
517 		current = avlRotate(node, dir);
518 	}
519 	if (current != tree->end)
520 		avlUpdateHeight(current);
521 }
522 
523 /*
524  * Insert a new value/field, starting from *node, reaching the correct position
525  * in the tree by recursion.  Possibly rebalance the tree and possibly update
526  * *node.  Do nothing if the value is already present in the tree.
527  */
528 static void
avlInsertNode(avl_tree * tree,avl_node ** node,pivot_field field)529 avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
530 {
531 	avl_node   *current = *node;
532 
533 	if (current == tree->end)
534 	{
535 		avl_node   *new_node = (avl_node *)
536 		pg_malloc(sizeof(avl_node));
537 
538 		new_node->height = 1;
539 		new_node->field = field;
540 		new_node->children[0] = new_node->children[1] = tree->end;
541 		tree->count++;
542 		*node = new_node;
543 	}
544 	else
545 	{
546 		int			cmp = pivotFieldCompare(&field, &current->field);
547 
548 		if (cmp != 0)
549 		{
550 			avlInsertNode(tree,
551 						  cmp > 0 ? &current->children[1] : &current->children[0],
552 						  field);
553 			avlAdjustBalance(tree, node);
554 		}
555 	}
556 }
557 
558 /* Insert the value into the AVL tree, if it does not preexist */
559 static void
avlMergeValue(avl_tree * tree,char * name,char * sort_value)560 avlMergeValue(avl_tree *tree, char *name, char *sort_value)
561 {
562 	pivot_field field;
563 
564 	field.name = name;
565 	field.rank = tree->count;
566 	field.sort_value = sort_value;
567 	avlInsertNode(tree, &tree->root, field);
568 }
569 
570 /*
571  * Recursively extract node values into the names array, in sorted order with a
572  * left-to-right tree traversal.
573  * Return the next candidate offset to write into the names array.
574  * fields[] must be preallocated to hold tree->count entries
575  */
576 static int
avlCollectFields(avl_tree * tree,avl_node * node,pivot_field * fields,int idx)577 avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
578 {
579 	if (node == tree->end)
580 		return idx;
581 
582 	idx = avlCollectFields(tree, node->children[0], fields, idx);
583 	fields[idx] = node->field;
584 	return avlCollectFields(tree, node->children[1], fields, idx + 1);
585 }
586 
587 static void
rankSort(int num_columns,pivot_field * piv_columns)588 rankSort(int num_columns, pivot_field *piv_columns)
589 {
590 	int		   *hmap;			/* [[offset in piv_columns, rank], ...for
591 								 * every header entry] */
592 	int			i;
593 
594 	hmap = (int *) pg_malloc(sizeof(int) * num_columns * 2);
595 	for (i = 0; i < num_columns; i++)
596 	{
597 		char	   *val = piv_columns[i].sort_value;
598 
599 		/* ranking information is valid if non null and matches /^-?\d+$/ */
600 		if (val &&
601 			((*val == '-' &&
602 			  strspn(val + 1, "0123456789") == strlen(val + 1)) ||
603 			 strspn(val, "0123456789") == strlen(val)))
604 		{
605 			hmap[i * 2] = atoi(val);
606 			hmap[i * 2 + 1] = i;
607 		}
608 		else
609 		{
610 			/* invalid rank information ignored (equivalent to rank 0) */
611 			hmap[i * 2] = 0;
612 			hmap[i * 2 + 1] = i;
613 		}
614 	}
615 
616 	qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
617 
618 	for (i = 0; i < num_columns; i++)
619 	{
620 		piv_columns[hmap[i * 2 + 1]].rank = i;
621 	}
622 
623 	pg_free(hmap);
624 }
625 
626 /*
627  * Look up a column reference, which can be either:
628  * - a number from 1 to PQnfields(res)
629  * - a column name matching one of PQfname(res,...)
630  *
631  * Returns zero-based column number, or -1 if not found or ambiguous.
632  *
633  * Note: may modify contents of "arg" string.
634  */
635 static int
indexOfColumn(char * arg,const PGresult * res)636 indexOfColumn(char *arg, const PGresult *res)
637 {
638 	int			idx;
639 
640 	if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
641 	{
642 		/* if arg contains only digits, it's a column number */
643 		idx = atoi(arg) - 1;
644 		if (idx < 0 || idx >= PQnfields(res))
645 		{
646 			pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
647 						 idx + 1, PQnfields(res));
648 			return -1;
649 		}
650 	}
651 	else
652 	{
653 		int			i;
654 
655 		/*
656 		 * Dequote and downcase the column name.  By checking for all-digits
657 		 * before doing this, we can ensure that a quoted name is treated as a
658 		 * name even if it's all digits.
659 		 */
660 		dequote_downcase_identifier(arg, true, pset.encoding);
661 
662 		/* Now look for match(es) among res' column names */
663 		idx = -1;
664 		for (i = 0; i < PQnfields(res); i++)
665 		{
666 			if (strcmp(arg, PQfname(res, i)) == 0)
667 			{
668 				if (idx >= 0)
669 				{
670 					/* another idx was already found for the same name */
671 					pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg);
672 					return -1;
673 				}
674 				idx = i;
675 			}
676 		}
677 		if (idx == -1)
678 		{
679 			pg_log_error("\\crosstabview: column name not found: \"%s\"", arg);
680 			return -1;
681 		}
682 	}
683 
684 	return idx;
685 }
686 
687 /*
688  * Value comparator for vertical and horizontal headers
689  * used for deduplication only.
690  * - null values are considered equal
691  * - non-null < null
692  * - non-null values are compared with strcmp()
693  */
694 static int
pivotFieldCompare(const void * a,const void * b)695 pivotFieldCompare(const void *a, const void *b)
696 {
697 	const pivot_field *pa = (const pivot_field *) a;
698 	const pivot_field *pb = (const pivot_field *) b;
699 
700 	/* test null values */
701 	if (!pb->name)
702 		return pa->name ? -1 : 0;
703 	else if (!pa->name)
704 		return 1;
705 
706 	/* non-null values */
707 	return strcmp(pa->name, pb->name);
708 }
709 
710 static int
rankCompare(const void * a,const void * b)711 rankCompare(const void *a, const void *b)
712 {
713 	return *((const int *) a) - *((const int *) b);
714 }
715