1 /* ----------
2  * pgstat.c
3  *
4  *	All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *	TODO:	- Separate collector, postmaster and backend stuff
7  *			  into different files.
8  *
9  *			- Add some automatic call for pgstat vacuuming.
10  *
11  *			- Add a pgstat config column to pg_database, so this
12  *			  entire thing can be enabled/disabled on a per db basis.
13  *
14  *	Copyright (c) 2001-2017, PostgreSQL Global Development Group
15  *
16  *	src/backend/postmaster/pgstat.c
17  * ----------
18  */
19 #include "postgres.h"
20 
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_SYS_SELECT_H
32 #include <sys/select.h>
33 #endif
34 
35 #include "pgstat.h"
36 
37 #include "access/heapam.h"
38 #include "access/htup_details.h"
39 #include "access/transam.h"
40 #include "access/twophase_rmgr.h"
41 #include "access/xact.h"
42 #include "catalog/pg_database.h"
43 #include "catalog/pg_proc.h"
44 #include "common/ip.h"
45 #include "libpq/libpq.h"
46 #include "libpq/pqsignal.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "pg_trace.h"
50 #include "postmaster/autovacuum.h"
51 #include "postmaster/fork_process.h"
52 #include "postmaster/postmaster.h"
53 #include "replication/walsender.h"
54 #include "storage/backendid.h"
55 #include "storage/dsm.h"
56 #include "storage/fd.h"
57 #include "storage/ipc.h"
58 #include "storage/latch.h"
59 #include "storage/lmgr.h"
60 #include "storage/pg_shmem.h"
61 #include "storage/procsignal.h"
62 #include "storage/sinvaladt.h"
63 #include "utils/ascii.h"
64 #include "utils/guc.h"
65 #include "utils/memutils.h"
66 #include "utils/ps_status.h"
67 #include "utils/rel.h"
68 #include "utils/snapmgr.h"
69 #include "utils/timestamp.h"
70 #include "utils/tqual.h"
71 
72 
73 /* ----------
74  * Timer definitions.
75  * ----------
76  */
77 #define PGSTAT_STAT_INTERVAL	500 /* Minimum time between stats file
78 									 * updates; in milliseconds. */
79 
80 #define PGSTAT_RETRY_DELAY		10	/* How long to wait between checks for a
81 									 * new file; in milliseconds. */
82 
83 #define PGSTAT_MAX_WAIT_TIME	10000	/* Maximum time to wait for a stats
84 										 * file update; in milliseconds. */
85 
86 #define PGSTAT_INQ_INTERVAL		640 /* How often to ping the collector for a
87 									 * new file; in milliseconds. */
88 
89 #define PGSTAT_RESTART_INTERVAL 60	/* How often to attempt to restart a
90 									 * failed statistics collector; in
91 									 * seconds. */
92 
93 #define PGSTAT_POLL_LOOP_COUNT	(PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
94 #define PGSTAT_INQ_LOOP_COUNT	(PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
95 
96 /* Minimum receive buffer size for the collector's socket. */
97 #define PGSTAT_MIN_RCVBUF		(100 * 1024)
98 
99 
100 /* ----------
101  * The initial size hints for the hash tables used in the collector.
102  * ----------
103  */
104 #define PGSTAT_DB_HASH_SIZE		16
105 #define PGSTAT_TAB_HASH_SIZE	512
106 #define PGSTAT_FUNCTION_HASH_SIZE	512
107 
108 
109 /* ----------
110  * Total number of backends including auxiliary
111  *
112  * We reserve a slot for each possible BackendId, plus one for each
113  * possible auxiliary process type.  (This scheme assumes there is not
114  * more than one of any auxiliary process type at a time.) MaxBackends
115  * includes autovacuum workers and background workers as well.
116  * ----------
117  */
118 #define NumBackendStatSlots (MaxBackends + NUM_AUXPROCTYPES)
119 
120 
121 /* ----------
122  * GUC parameters
123  * ----------
124  */
125 bool		pgstat_track_activities = false;
126 bool		pgstat_track_counts = false;
127 int			pgstat_track_functions = TRACK_FUNC_OFF;
128 int			pgstat_track_activity_query_size = 1024;
129 
130 /* ----------
131  * Built from GUC parameter
132  * ----------
133  */
134 char	   *pgstat_stat_directory = NULL;
135 char	   *pgstat_stat_filename = NULL;
136 char	   *pgstat_stat_tmpname = NULL;
137 
138 /*
139  * BgWriter global statistics counters (unused in other processes).
140  * Stored directly in a stats message structure so it can be sent
141  * without needing to copy things around.  We assume this inits to zeroes.
142  */
143 PgStat_MsgBgWriter BgWriterStats;
144 
145 /* ----------
146  * Local data
147  * ----------
148  */
149 NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET;
150 
151 static struct sockaddr_storage pgStatAddr;
152 
153 static time_t last_pgstat_start_time;
154 
155 static bool pgStatRunningInCollector = false;
156 
157 /*
158  * Structures in which backends store per-table info that's waiting to be
159  * sent to the collector.
160  *
161  * NOTE: once allocated, TabStatusArray structures are never moved or deleted
162  * for the life of the backend.  Also, we zero out the t_id fields of the
163  * contained PgStat_TableStatus structs whenever they are not actively in use.
164  * This allows relcache pgstat_info pointers to be treated as long-lived data,
165  * avoiding repeated searches in pgstat_initstats() when a relation is
166  * repeatedly opened during a transaction.
167  */
168 #define TABSTAT_QUANTUM		100 /* we alloc this many at a time */
169 
170 typedef struct TabStatusArray
171 {
172 	struct TabStatusArray *tsa_next;	/* link to next array, if any */
173 	int			tsa_used;		/* # entries currently used */
174 	PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];	/* per-table data */
175 } TabStatusArray;
176 
177 static TabStatusArray *pgStatTabList = NULL;
178 
179 /*
180  * pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
181  */
182 typedef struct TabStatHashEntry
183 {
184 	Oid			t_id;
185 	PgStat_TableStatus *tsa_entry;
186 } TabStatHashEntry;
187 
188 /*
189  * Hash table for O(1) t_id -> tsa_entry lookup
190  */
191 static HTAB *pgStatTabHash = NULL;
192 
193 /*
194  * Backends store per-function info that's waiting to be sent to the collector
195  * in this hash table (indexed by function OID).
196  */
197 static HTAB *pgStatFunctions = NULL;
198 
199 /*
200  * Indicates if backend has some function stats that it hasn't yet
201  * sent to the collector.
202  */
203 static bool have_function_stats = false;
204 
205 /*
206  * Tuple insertion/deletion counts for an open transaction can't be propagated
207  * into PgStat_TableStatus counters until we know if it is going to commit
208  * or abort.  Hence, we keep these counts in per-subxact structs that live
209  * in TopTransactionContext.  This data structure is designed on the assumption
210  * that subxacts won't usually modify very many tables.
211  */
212 typedef struct PgStat_SubXactStatus
213 {
214 	int			nest_level;		/* subtransaction nest level */
215 	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
216 	PgStat_TableXactStatus *first;	/* head of list for this subxact */
217 } PgStat_SubXactStatus;
218 
219 static PgStat_SubXactStatus *pgStatXactStack = NULL;
220 
221 static int	pgStatXactCommit = 0;
222 static int	pgStatXactRollback = 0;
223 PgStat_Counter pgStatBlockReadTime = 0;
224 PgStat_Counter pgStatBlockWriteTime = 0;
225 
226 /* Record that's written to 2PC state file when pgstat state is persisted */
227 typedef struct TwoPhasePgStatRecord
228 {
229 	PgStat_Counter tuples_inserted; /* tuples inserted in xact */
230 	PgStat_Counter tuples_updated;	/* tuples updated in xact */
231 	PgStat_Counter tuples_deleted;	/* tuples deleted in xact */
232 	PgStat_Counter inserted_pre_trunc;	/* tuples inserted prior to truncate */
233 	PgStat_Counter updated_pre_trunc;	/* tuples updated prior to truncate */
234 	PgStat_Counter deleted_pre_trunc;	/* tuples deleted prior to truncate */
235 	Oid			t_id;			/* table's OID */
236 	bool		t_shared;		/* is it a shared catalog? */
237 	bool		t_truncated;	/* was the relation truncated? */
238 } TwoPhasePgStatRecord;
239 
240 /*
241  * Info about current "snapshot" of stats file
242  */
243 static MemoryContext pgStatLocalContext = NULL;
244 static HTAB *pgStatDBHash = NULL;
245 
246 /* Status for backends including auxiliary */
247 static LocalPgBackendStatus *localBackendStatusTable = NULL;
248 
249 /* Total number of backends including auxiliary */
250 static int	localNumBackends = 0;
251 
252 /*
253  * Cluster wide statistics, kept in the stats collector.
254  * Contains statistics that are not collected per database
255  * or per table.
256  */
257 static PgStat_ArchiverStats archiverStats;
258 static PgStat_GlobalStats globalStats;
259 
260 /*
261  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
262  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
263  * will write both that DB's data and the shared stats.
264  */
265 static List *pending_write_requests = NIL;
266 
267 /* Signal handler flags */
268 static volatile bool need_exit = false;
269 static volatile bool got_SIGHUP = false;
270 
271 /*
272  * Total time charged to functions so far in the current backend.
273  * We use this to help separate "self" and "other" time charges.
274  * (We assume this initializes to zero.)
275  */
276 static instr_time total_func_time;
277 
278 
279 /* ----------
280  * Local function forward declarations
281  * ----------
282  */
283 #ifdef EXEC_BACKEND
284 static pid_t pgstat_forkexec(void);
285 #endif
286 
287 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
288 static void pgstat_exit(SIGNAL_ARGS);
289 static void pgstat_beshutdown_hook(int code, Datum arg);
290 static void pgstat_sighup_handler(SIGNAL_ARGS);
291 
292 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
293 static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
294 					 Oid tableoid, bool create);
295 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
296 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
297 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
298 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
299 static void backend_read_statsfile(void);
300 static void pgstat_read_current_status(void);
301 
302 static bool pgstat_write_statsfile_needed(void);
303 static bool pgstat_db_requested(Oid databaseid);
304 
305 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
306 static void pgstat_send_funcstats(void);
307 static HTAB *pgstat_collect_oids(Oid catalogid);
308 
309 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
310 
311 static void pgstat_setup_memcxt(void);
312 
313 static const char *pgstat_get_wait_activity(WaitEventActivity w);
314 static const char *pgstat_get_wait_client(WaitEventClient w);
315 static const char *pgstat_get_wait_ipc(WaitEventIPC w);
316 static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
317 static const char *pgstat_get_wait_io(WaitEventIO w);
318 
319 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
320 static void pgstat_send(void *msg, int len);
321 
322 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
323 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
324 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
325 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
326 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
327 static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
328 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
329 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
330 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
331 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
332 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
333 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
334 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
335 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
336 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
337 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
338 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
339 
340 /* ------------------------------------------------------------
341  * Public functions called from postmaster follow
342  * ------------------------------------------------------------
343  */
344 
345 /* ----------
346  * pgstat_init() -
347  *
348  *	Called from postmaster at startup. Create the resources required
349  *	by the statistics collector process.  If unable to do so, do not
350  *	fail --- better to let the postmaster start with stats collection
351  *	disabled.
352  * ----------
353  */
354 void
pgstat_init(void)355 pgstat_init(void)
356 {
357 	ACCEPT_TYPE_ARG3 alen;
358 	struct addrinfo *addrs = NULL,
359 			   *addr,
360 				hints;
361 	int			ret;
362 	fd_set		rset;
363 	struct timeval tv;
364 	char		test_byte;
365 	int			sel_res;
366 	int			tries = 0;
367 
368 #define TESTBYTEVAL ((char) 199)
369 
370 	/*
371 	 * This static assertion verifies that we didn't mess up the calculations
372 	 * involved in selecting maximum payload sizes for our UDP messages.
373 	 * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
374 	 * be silent performance loss from fragmentation, it seems worth having a
375 	 * compile-time cross-check that we didn't.
376 	 */
377 	StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
378 					 "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
379 
380 	/*
381 	 * Create the UDP socket for sending and receiving statistic messages
382 	 */
383 	hints.ai_flags = AI_PASSIVE;
384 	hints.ai_family = AF_UNSPEC;
385 	hints.ai_socktype = SOCK_DGRAM;
386 	hints.ai_protocol = 0;
387 	hints.ai_addrlen = 0;
388 	hints.ai_addr = NULL;
389 	hints.ai_canonname = NULL;
390 	hints.ai_next = NULL;
391 	ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
392 	if (ret || !addrs)
393 	{
394 		ereport(LOG,
395 				(errmsg("could not resolve \"localhost\": %s",
396 						gai_strerror(ret))));
397 		goto startup_failed;
398 	}
399 
400 	/*
401 	 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
402 	 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
403 	 * when kernel will reject IPv6).  Worse, the failure may occur at the
404 	 * bind() or perhaps even connect() stage.  So we must loop through the
405 	 * results till we find a working combination. We will generate LOG
406 	 * messages, but no error, for bogus combinations.
407 	 */
408 	for (addr = addrs; addr; addr = addr->ai_next)
409 	{
410 #ifdef HAVE_UNIX_SOCKETS
411 		/* Ignore AF_UNIX sockets, if any are returned. */
412 		if (addr->ai_family == AF_UNIX)
413 			continue;
414 #endif
415 
416 		if (++tries > 1)
417 			ereport(LOG,
418 					(errmsg("trying another address for the statistics collector")));
419 
420 		/*
421 		 * Create the socket.
422 		 */
423 		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
424 		{
425 			ereport(LOG,
426 					(errcode_for_socket_access(),
427 					 errmsg("could not create socket for statistics collector: %m")));
428 			continue;
429 		}
430 
431 		/*
432 		 * Bind it to a kernel assigned port on localhost and get the assigned
433 		 * port via getsockname().
434 		 */
435 		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
436 		{
437 			ereport(LOG,
438 					(errcode_for_socket_access(),
439 					 errmsg("could not bind socket for statistics collector: %m")));
440 			closesocket(pgStatSock);
441 			pgStatSock = PGINVALID_SOCKET;
442 			continue;
443 		}
444 
445 		alen = sizeof(pgStatAddr);
446 		if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
447 		{
448 			ereport(LOG,
449 					(errcode_for_socket_access(),
450 					 errmsg("could not get address of socket for statistics collector: %m")));
451 			closesocket(pgStatSock);
452 			pgStatSock = PGINVALID_SOCKET;
453 			continue;
454 		}
455 
456 		/*
457 		 * Connect the socket to its own address.  This saves a few cycles by
458 		 * not having to respecify the target address on every send. This also
459 		 * provides a kernel-level check that only packets from this same
460 		 * address will be received.
461 		 */
462 		if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
463 		{
464 			ereport(LOG,
465 					(errcode_for_socket_access(),
466 					 errmsg("could not connect socket for statistics collector: %m")));
467 			closesocket(pgStatSock);
468 			pgStatSock = PGINVALID_SOCKET;
469 			continue;
470 		}
471 
472 		/*
473 		 * Try to send and receive a one-byte test message on the socket. This
474 		 * is to catch situations where the socket can be created but will not
475 		 * actually pass data (for instance, because kernel packet filtering
476 		 * rules prevent it).
477 		 */
478 		test_byte = TESTBYTEVAL;
479 
480 retry1:
481 		if (send(pgStatSock, &test_byte, 1, 0) != 1)
482 		{
483 			if (errno == EINTR)
484 				goto retry1;	/* if interrupted, just retry */
485 			ereport(LOG,
486 					(errcode_for_socket_access(),
487 					 errmsg("could not send test message on socket for statistics collector: %m")));
488 			closesocket(pgStatSock);
489 			pgStatSock = PGINVALID_SOCKET;
490 			continue;
491 		}
492 
493 		/*
494 		 * There could possibly be a little delay before the message can be
495 		 * received.  We arbitrarily allow up to half a second before deciding
496 		 * it's broken.
497 		 */
498 		for (;;)				/* need a loop to handle EINTR */
499 		{
500 			FD_ZERO(&rset);
501 			FD_SET(pgStatSock, &rset);
502 
503 			tv.tv_sec = 0;
504 			tv.tv_usec = 500000;
505 			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
506 			if (sel_res >= 0 || errno != EINTR)
507 				break;
508 		}
509 		if (sel_res < 0)
510 		{
511 			ereport(LOG,
512 					(errcode_for_socket_access(),
513 					 errmsg("select() failed in statistics collector: %m")));
514 			closesocket(pgStatSock);
515 			pgStatSock = PGINVALID_SOCKET;
516 			continue;
517 		}
518 		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
519 		{
520 			/*
521 			 * This is the case we actually think is likely, so take pains to
522 			 * give a specific message for it.
523 			 *
524 			 * errno will not be set meaningfully here, so don't use it.
525 			 */
526 			ereport(LOG,
527 					(errcode(ERRCODE_CONNECTION_FAILURE),
528 					 errmsg("test message did not get through on socket for statistics collector")));
529 			closesocket(pgStatSock);
530 			pgStatSock = PGINVALID_SOCKET;
531 			continue;
532 		}
533 
534 		test_byte++;			/* just make sure variable is changed */
535 
536 retry2:
537 		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
538 		{
539 			if (errno == EINTR)
540 				goto retry2;	/* if interrupted, just retry */
541 			ereport(LOG,
542 					(errcode_for_socket_access(),
543 					 errmsg("could not receive test message on socket for statistics collector: %m")));
544 			closesocket(pgStatSock);
545 			pgStatSock = PGINVALID_SOCKET;
546 			continue;
547 		}
548 
549 		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
550 		{
551 			ereport(LOG,
552 					(errcode(ERRCODE_INTERNAL_ERROR),
553 					 errmsg("incorrect test message transmission on socket for statistics collector")));
554 			closesocket(pgStatSock);
555 			pgStatSock = PGINVALID_SOCKET;
556 			continue;
557 		}
558 
559 		/* If we get here, we have a working socket */
560 		break;
561 	}
562 
563 	/* Did we find a working address? */
564 	if (!addr || pgStatSock == PGINVALID_SOCKET)
565 		goto startup_failed;
566 
567 	/*
568 	 * Set the socket to non-blocking IO.  This ensures that if the collector
569 	 * falls behind, statistics messages will be discarded; backends won't
570 	 * block waiting to send messages to the collector.
571 	 */
572 	if (!pg_set_noblock(pgStatSock))
573 	{
574 		ereport(LOG,
575 				(errcode_for_socket_access(),
576 				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
577 		goto startup_failed;
578 	}
579 
580 	/*
581 	 * Try to ensure that the socket's receive buffer is at least
582 	 * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
583 	 * data.  Use of UDP protocol means that we are willing to lose data under
584 	 * heavy load, but we don't want it to happen just because of ridiculously
585 	 * small default buffer sizes (such as 8KB on older Windows versions).
586 	 */
587 	{
588 		int			old_rcvbuf;
589 		int			new_rcvbuf;
590 		ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
591 
592 		if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
593 					   (char *) &old_rcvbuf, &rcvbufsize) < 0)
594 		{
595 			elog(LOG, "getsockopt(SO_RCVBUF) failed: %m");
596 			/* if we can't get existing size, always try to set it */
597 			old_rcvbuf = 0;
598 		}
599 
600 		new_rcvbuf = PGSTAT_MIN_RCVBUF;
601 		if (old_rcvbuf < new_rcvbuf)
602 		{
603 			if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
604 						   (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
605 				elog(LOG, "setsockopt(SO_RCVBUF) failed: %m");
606 		}
607 	}
608 
609 	pg_freeaddrinfo_all(hints.ai_family, addrs);
610 
611 	return;
612 
613 startup_failed:
614 	ereport(LOG,
615 			(errmsg("disabling statistics collector for lack of working socket")));
616 
617 	if (addrs)
618 		pg_freeaddrinfo_all(hints.ai_family, addrs);
619 
620 	if (pgStatSock != PGINVALID_SOCKET)
621 		closesocket(pgStatSock);
622 	pgStatSock = PGINVALID_SOCKET;
623 
624 	/*
625 	 * Adjust GUC variables to suppress useless activity, and for debugging
626 	 * purposes (seeing track_counts off is a clue that we failed here). We
627 	 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
628 	 * on from postgresql.conf without a restart.
629 	 */
630 	SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
631 }
632 
633 /*
634  * subroutine for pgstat_reset_all
635  */
636 static void
pgstat_reset_remove_files(const char * directory)637 pgstat_reset_remove_files(const char *directory)
638 {
639 	DIR		   *dir;
640 	struct dirent *entry;
641 	char		fname[MAXPGPATH * 2];
642 
643 	dir = AllocateDir(directory);
644 	while ((entry = ReadDir(dir, directory)) != NULL)
645 	{
646 		int			nchars;
647 		Oid			tmp_oid;
648 
649 		/*
650 		 * Skip directory entries that don't match the file names we write.
651 		 * See get_dbstat_filename for the database-specific pattern.
652 		 */
653 		if (strncmp(entry->d_name, "global.", 7) == 0)
654 			nchars = 7;
655 		else
656 		{
657 			nchars = 0;
658 			(void) sscanf(entry->d_name, "db_%u.%n",
659 						  &tmp_oid, &nchars);
660 			if (nchars <= 0)
661 				continue;
662 			/* %u allows leading whitespace, so reject that */
663 			if (strchr("0123456789", entry->d_name[3]) == NULL)
664 				continue;
665 		}
666 
667 		if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
668 			strcmp(entry->d_name + nchars, "stat") != 0)
669 			continue;
670 
671 		snprintf(fname, sizeof(fname), "%s/%s", directory,
672 				 entry->d_name);
673 		unlink(fname);
674 	}
675 	FreeDir(dir);
676 }
677 
678 /*
679  * pgstat_reset_all() -
680  *
681  * Remove the stats files.  This is currently used only if WAL
682  * recovery is needed after a crash.
683  */
684 void
pgstat_reset_all(void)685 pgstat_reset_all(void)
686 {
687 	pgstat_reset_remove_files(pgstat_stat_directory);
688 	pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
689 }
690 
691 #ifdef EXEC_BACKEND
692 
693 /*
694  * pgstat_forkexec() -
695  *
696  * Format up the arglist for, then fork and exec, statistics collector process
697  */
698 static pid_t
pgstat_forkexec(void)699 pgstat_forkexec(void)
700 {
701 	char	   *av[10];
702 	int			ac = 0;
703 
704 	av[ac++] = "postgres";
705 	av[ac++] = "--forkcol";
706 	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
707 
708 	av[ac] = NULL;
709 	Assert(ac < lengthof(av));
710 
711 	return postmaster_forkexec(ac, av);
712 }
713 #endif							/* EXEC_BACKEND */
714 
715 
716 /*
717  * pgstat_start() -
718  *
719  *	Called from postmaster at startup or after an existing collector
720  *	died.  Attempt to fire up a fresh statistics collector.
721  *
722  *	Returns PID of child process, or 0 if fail.
723  *
724  *	Note: if fail, we will be called again from the postmaster main loop.
725  */
726 int
pgstat_start(void)727 pgstat_start(void)
728 {
729 	time_t		curtime;
730 	pid_t		pgStatPid;
731 
732 	/*
733 	 * Check that the socket is there, else pgstat_init failed and we can do
734 	 * nothing useful.
735 	 */
736 	if (pgStatSock == PGINVALID_SOCKET)
737 		return 0;
738 
739 	/*
740 	 * Do nothing if too soon since last collector start.  This is a safety
741 	 * valve to protect against continuous respawn attempts if the collector
742 	 * is dying immediately at launch.  Note that since we will be re-called
743 	 * from the postmaster main loop, we will get another chance later.
744 	 */
745 	curtime = time(NULL);
746 	if ((unsigned int) (curtime - last_pgstat_start_time) <
747 		(unsigned int) PGSTAT_RESTART_INTERVAL)
748 		return 0;
749 	last_pgstat_start_time = curtime;
750 
751 	/*
752 	 * Okay, fork off the collector.
753 	 */
754 #ifdef EXEC_BACKEND
755 	switch ((pgStatPid = pgstat_forkexec()))
756 #else
757 	switch ((pgStatPid = fork_process()))
758 #endif
759 	{
760 		case -1:
761 			ereport(LOG,
762 					(errmsg("could not fork statistics collector: %m")));
763 			return 0;
764 
765 #ifndef EXEC_BACKEND
766 		case 0:
767 			/* in postmaster child ... */
768 			InitPostmasterChild();
769 
770 			/* Close the postmaster's sockets */
771 			ClosePostmasterPorts(false);
772 
773 			/* Drop our connection to postmaster's shared memory, as well */
774 			dsm_detach_all();
775 			PGSharedMemoryDetach();
776 
777 			PgstatCollectorMain(0, NULL);
778 			break;
779 #endif
780 
781 		default:
782 			return (int) pgStatPid;
783 	}
784 
785 	/* shouldn't get here */
786 	return 0;
787 }
788 
789 void
allow_immediate_pgstat_restart(void)790 allow_immediate_pgstat_restart(void)
791 {
792 	last_pgstat_start_time = 0;
793 }
794 
795 /* ------------------------------------------------------------
796  * Public functions used by backends follow
797  *------------------------------------------------------------
798  */
799 
800 
801 /* ----------
802  * pgstat_report_stat() -
803  *
804  *	Must be called by processes that performs DML: tcop/postgres.c, logical
805  *	receiver processes, SPI worker, etc. to send the so far collected
806  *	per-table and function usage statistics to the collector.  Note that this
807  *	is called only when not within a transaction, so it is fair to use
808  *	transaction stop time as an approximation of current time.
809  * ----------
810  */
811 void
pgstat_report_stat(bool force)812 pgstat_report_stat(bool force)
813 {
814 	/* we assume this inits to all zeroes: */
815 	static const PgStat_TableCounts all_zeroes;
816 	static TimestampTz last_report = 0;
817 
818 	TimestampTz now;
819 	PgStat_MsgTabstat regular_msg;
820 	PgStat_MsgTabstat shared_msg;
821 	TabStatusArray *tsa;
822 	int			i;
823 
824 	/* Don't expend a clock check if nothing to do */
825 	if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
826 		pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
827 		!have_function_stats)
828 		return;
829 
830 	/*
831 	 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
832 	 * msec since we last sent one, or the caller wants to force stats out.
833 	 */
834 	now = GetCurrentTransactionStopTimestamp();
835 	if (!force &&
836 		!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
837 		return;
838 	last_report = now;
839 
840 	/*
841 	 * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
842 	 * entries it points to.  (Should we fail partway through the loop below,
843 	 * it's okay to have removed the hashtable already --- the only
844 	 * consequence is we'd get multiple entries for the same table in the
845 	 * pgStatTabList, and that's safe.)
846 	 */
847 	if (pgStatTabHash)
848 		hash_destroy(pgStatTabHash);
849 	pgStatTabHash = NULL;
850 
851 	/*
852 	 * Scan through the TabStatusArray struct(s) to find tables that actually
853 	 * have counts, and build messages to send.  We have to separate shared
854 	 * relations from regular ones because the databaseid field in the message
855 	 * header has to depend on that.
856 	 */
857 	regular_msg.m_databaseid = MyDatabaseId;
858 	shared_msg.m_databaseid = InvalidOid;
859 	regular_msg.m_nentries = 0;
860 	shared_msg.m_nentries = 0;
861 
862 	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
863 	{
864 		for (i = 0; i < tsa->tsa_used; i++)
865 		{
866 			PgStat_TableStatus *entry = &tsa->tsa_entries[i];
867 			PgStat_MsgTabstat *this_msg;
868 			PgStat_TableEntry *this_ent;
869 
870 			/* Shouldn't have any pending transaction-dependent counts */
871 			Assert(entry->trans == NULL);
872 
873 			/*
874 			 * Ignore entries that didn't accumulate any actual counts, such
875 			 * as indexes that were opened by the planner but not used.
876 			 */
877 			if (memcmp(&entry->t_counts, &all_zeroes,
878 					   sizeof(PgStat_TableCounts)) == 0)
879 				continue;
880 
881 			/*
882 			 * OK, insert data into the appropriate message, and send if full.
883 			 */
884 			this_msg = entry->t_shared ? &shared_msg : &regular_msg;
885 			this_ent = &this_msg->m_entry[this_msg->m_nentries];
886 			this_ent->t_id = entry->t_id;
887 			memcpy(&this_ent->t_counts, &entry->t_counts,
888 				   sizeof(PgStat_TableCounts));
889 			if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
890 			{
891 				pgstat_send_tabstat(this_msg);
892 				this_msg->m_nentries = 0;
893 			}
894 		}
895 		/* zero out TableStatus structs after use */
896 		MemSet(tsa->tsa_entries, 0,
897 			   tsa->tsa_used * sizeof(PgStat_TableStatus));
898 		tsa->tsa_used = 0;
899 	}
900 
901 	/*
902 	 * Send partial messages.  Make sure that any pending xact commit/abort
903 	 * gets counted, even if there are no table stats to send.
904 	 */
905 	if (regular_msg.m_nentries > 0 ||
906 		pgStatXactCommit > 0 || pgStatXactRollback > 0)
907 		pgstat_send_tabstat(&regular_msg);
908 	if (shared_msg.m_nentries > 0)
909 		pgstat_send_tabstat(&shared_msg);
910 
911 	/* Now, send function statistics */
912 	pgstat_send_funcstats();
913 }
914 
915 /*
916  * Subroutine for pgstat_report_stat: finish and send a tabstat message
917  */
918 static void
pgstat_send_tabstat(PgStat_MsgTabstat * tsmsg)919 pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg)
920 {
921 	int			n;
922 	int			len;
923 
924 	/* It's unlikely we'd get here with no socket, but maybe not impossible */
925 	if (pgStatSock == PGINVALID_SOCKET)
926 		return;
927 
928 	/*
929 	 * Report and reset accumulated xact commit/rollback and I/O timings
930 	 * whenever we send a normal tabstat message
931 	 */
932 	if (OidIsValid(tsmsg->m_databaseid))
933 	{
934 		tsmsg->m_xact_commit = pgStatXactCommit;
935 		tsmsg->m_xact_rollback = pgStatXactRollback;
936 		tsmsg->m_block_read_time = pgStatBlockReadTime;
937 		tsmsg->m_block_write_time = pgStatBlockWriteTime;
938 		pgStatXactCommit = 0;
939 		pgStatXactRollback = 0;
940 		pgStatBlockReadTime = 0;
941 		pgStatBlockWriteTime = 0;
942 	}
943 	else
944 	{
945 		tsmsg->m_xact_commit = 0;
946 		tsmsg->m_xact_rollback = 0;
947 		tsmsg->m_block_read_time = 0;
948 		tsmsg->m_block_write_time = 0;
949 	}
950 
951 	n = tsmsg->m_nentries;
952 	len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
953 		n * sizeof(PgStat_TableEntry);
954 
955 	pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
956 	pgstat_send(tsmsg, len);
957 }
958 
959 /*
960  * Subroutine for pgstat_report_stat: populate and send a function stat message
961  */
962 static void
pgstat_send_funcstats(void)963 pgstat_send_funcstats(void)
964 {
965 	/* we assume this inits to all zeroes: */
966 	static const PgStat_FunctionCounts all_zeroes;
967 
968 	PgStat_MsgFuncstat msg;
969 	PgStat_BackendFunctionEntry *entry;
970 	HASH_SEQ_STATUS fstat;
971 
972 	if (pgStatFunctions == NULL)
973 		return;
974 
975 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
976 	msg.m_databaseid = MyDatabaseId;
977 	msg.m_nentries = 0;
978 
979 	hash_seq_init(&fstat, pgStatFunctions);
980 	while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
981 	{
982 		PgStat_FunctionEntry *m_ent;
983 
984 		/* Skip it if no counts accumulated since last time */
985 		if (memcmp(&entry->f_counts, &all_zeroes,
986 				   sizeof(PgStat_FunctionCounts)) == 0)
987 			continue;
988 
989 		/* need to convert format of time accumulators */
990 		m_ent = &msg.m_entry[msg.m_nentries];
991 		m_ent->f_id = entry->f_id;
992 		m_ent->f_numcalls = entry->f_counts.f_numcalls;
993 		m_ent->f_total_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_total_time);
994 		m_ent->f_self_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_self_time);
995 
996 		if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
997 		{
998 			pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
999 						msg.m_nentries * sizeof(PgStat_FunctionEntry));
1000 			msg.m_nentries = 0;
1001 		}
1002 
1003 		/* reset the entry's counts */
1004 		MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
1005 	}
1006 
1007 	if (msg.m_nentries > 0)
1008 		pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1009 					msg.m_nentries * sizeof(PgStat_FunctionEntry));
1010 
1011 	have_function_stats = false;
1012 }
1013 
1014 
1015 /* ----------
1016  * pgstat_vacuum_stat() -
1017  *
1018  *	Will tell the collector about objects he can get rid of.
1019  * ----------
1020  */
1021 void
pgstat_vacuum_stat(void)1022 pgstat_vacuum_stat(void)
1023 {
1024 	HTAB	   *htab;
1025 	PgStat_MsgTabpurge msg;
1026 	PgStat_MsgFuncpurge f_msg;
1027 	HASH_SEQ_STATUS hstat;
1028 	PgStat_StatDBEntry *dbentry;
1029 	PgStat_StatTabEntry *tabentry;
1030 	PgStat_StatFuncEntry *funcentry;
1031 	int			len;
1032 
1033 	if (pgStatSock == PGINVALID_SOCKET)
1034 		return;
1035 
1036 	/*
1037 	 * If not done for this transaction, read the statistics collector stats
1038 	 * file into some hash tables.
1039 	 */
1040 	backend_read_statsfile();
1041 
1042 	/*
1043 	 * Read pg_database and make a list of OIDs of all existing databases
1044 	 */
1045 	htab = pgstat_collect_oids(DatabaseRelationId);
1046 
1047 	/*
1048 	 * Search the database hash table for dead databases and tell the
1049 	 * collector to drop them.
1050 	 */
1051 	hash_seq_init(&hstat, pgStatDBHash);
1052 	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1053 	{
1054 		Oid			dbid = dbentry->databaseid;
1055 
1056 		CHECK_FOR_INTERRUPTS();
1057 
1058 		/* the DB entry for shared tables (with InvalidOid) is never dropped */
1059 		if (OidIsValid(dbid) &&
1060 			hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
1061 			pgstat_drop_database(dbid);
1062 	}
1063 
1064 	/* Clean up */
1065 	hash_destroy(htab);
1066 
1067 	/*
1068 	 * Lookup our own database entry; if not found, nothing more to do.
1069 	 */
1070 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1071 												 (void *) &MyDatabaseId,
1072 												 HASH_FIND, NULL);
1073 	if (dbentry == NULL || dbentry->tables == NULL)
1074 		return;
1075 
1076 	/*
1077 	 * Similarly to above, make a list of all known relations in this DB.
1078 	 */
1079 	htab = pgstat_collect_oids(RelationRelationId);
1080 
1081 	/*
1082 	 * Initialize our messages table counter to zero
1083 	 */
1084 	msg.m_nentries = 0;
1085 
1086 	/*
1087 	 * Check for all tables listed in stats hashtable if they still exist.
1088 	 */
1089 	hash_seq_init(&hstat, dbentry->tables);
1090 	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1091 	{
1092 		Oid			tabid = tabentry->tableid;
1093 
1094 		CHECK_FOR_INTERRUPTS();
1095 
1096 		if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1097 			continue;
1098 
1099 		/*
1100 		 * Not there, so add this table's Oid to the message
1101 		 */
1102 		msg.m_tableid[msg.m_nentries++] = tabid;
1103 
1104 		/*
1105 		 * If the message is full, send it out and reinitialize to empty
1106 		 */
1107 		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
1108 		{
1109 			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1110 				+ msg.m_nentries * sizeof(Oid);
1111 
1112 			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1113 			msg.m_databaseid = MyDatabaseId;
1114 			pgstat_send(&msg, len);
1115 
1116 			msg.m_nentries = 0;
1117 		}
1118 	}
1119 
1120 	/*
1121 	 * Send the rest
1122 	 */
1123 	if (msg.m_nentries > 0)
1124 	{
1125 		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1126 			+ msg.m_nentries * sizeof(Oid);
1127 
1128 		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1129 		msg.m_databaseid = MyDatabaseId;
1130 		pgstat_send(&msg, len);
1131 	}
1132 
1133 	/* Clean up */
1134 	hash_destroy(htab);
1135 
1136 	/*
1137 	 * Now repeat the above steps for functions.  However, we needn't bother
1138 	 * in the common case where no function stats are being collected.
1139 	 */
1140 	if (dbentry->functions != NULL &&
1141 		hash_get_num_entries(dbentry->functions) > 0)
1142 	{
1143 		htab = pgstat_collect_oids(ProcedureRelationId);
1144 
1145 		pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
1146 		f_msg.m_databaseid = MyDatabaseId;
1147 		f_msg.m_nentries = 0;
1148 
1149 		hash_seq_init(&hstat, dbentry->functions);
1150 		while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
1151 		{
1152 			Oid			funcid = funcentry->functionid;
1153 
1154 			CHECK_FOR_INTERRUPTS();
1155 
1156 			if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
1157 				continue;
1158 
1159 			/*
1160 			 * Not there, so add this function's Oid to the message
1161 			 */
1162 			f_msg.m_functionid[f_msg.m_nentries++] = funcid;
1163 
1164 			/*
1165 			 * If the message is full, send it out and reinitialize to empty
1166 			 */
1167 			if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
1168 			{
1169 				len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1170 					+ f_msg.m_nentries * sizeof(Oid);
1171 
1172 				pgstat_send(&f_msg, len);
1173 
1174 				f_msg.m_nentries = 0;
1175 			}
1176 		}
1177 
1178 		/*
1179 		 * Send the rest
1180 		 */
1181 		if (f_msg.m_nentries > 0)
1182 		{
1183 			len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1184 				+ f_msg.m_nentries * sizeof(Oid);
1185 
1186 			pgstat_send(&f_msg, len);
1187 		}
1188 
1189 		hash_destroy(htab);
1190 	}
1191 }
1192 
1193 
1194 /* ----------
1195  * pgstat_collect_oids() -
1196  *
1197  *	Collect the OIDs of all objects listed in the specified system catalog
1198  *	into a temporary hash table.  Caller should hash_destroy the result
1199  *	when done with it.  (However, we make the table in CurrentMemoryContext
1200  *	so that it will be freed properly in event of an error.)
1201  * ----------
1202  */
1203 static HTAB *
pgstat_collect_oids(Oid catalogid)1204 pgstat_collect_oids(Oid catalogid)
1205 {
1206 	HTAB	   *htab;
1207 	HASHCTL		hash_ctl;
1208 	Relation	rel;
1209 	HeapScanDesc scan;
1210 	HeapTuple	tup;
1211 	Snapshot	snapshot;
1212 
1213 	memset(&hash_ctl, 0, sizeof(hash_ctl));
1214 	hash_ctl.keysize = sizeof(Oid);
1215 	hash_ctl.entrysize = sizeof(Oid);
1216 	hash_ctl.hcxt = CurrentMemoryContext;
1217 	htab = hash_create("Temporary table of OIDs",
1218 					   PGSTAT_TAB_HASH_SIZE,
1219 					   &hash_ctl,
1220 					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1221 
1222 	rel = heap_open(catalogid, AccessShareLock);
1223 	snapshot = RegisterSnapshot(GetLatestSnapshot());
1224 	scan = heap_beginscan(rel, snapshot, 0, NULL);
1225 	while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
1226 	{
1227 		Oid			thisoid = HeapTupleGetOid(tup);
1228 
1229 		CHECK_FOR_INTERRUPTS();
1230 
1231 		(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
1232 	}
1233 	heap_endscan(scan);
1234 	UnregisterSnapshot(snapshot);
1235 	heap_close(rel, AccessShareLock);
1236 
1237 	return htab;
1238 }
1239 
1240 
1241 /* ----------
1242  * pgstat_drop_database() -
1243  *
1244  *	Tell the collector that we just dropped a database.
1245  *	(If the message gets lost, we will still clean the dead DB eventually
1246  *	via future invocations of pgstat_vacuum_stat().)
1247  * ----------
1248  */
1249 void
pgstat_drop_database(Oid databaseid)1250 pgstat_drop_database(Oid databaseid)
1251 {
1252 	PgStat_MsgDropdb msg;
1253 
1254 	if (pgStatSock == PGINVALID_SOCKET)
1255 		return;
1256 
1257 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1258 	msg.m_databaseid = databaseid;
1259 	pgstat_send(&msg, sizeof(msg));
1260 }
1261 
1262 
1263 /* ----------
1264  * pgstat_drop_relation() -
1265  *
1266  *	Tell the collector that we just dropped a relation.
1267  *	(If the message gets lost, we will still clean the dead entry eventually
1268  *	via future invocations of pgstat_vacuum_stat().)
1269  *
1270  *	Currently not used for lack of any good place to call it; we rely
1271  *	entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1272  * ----------
1273  */
1274 #ifdef NOT_USED
1275 void
pgstat_drop_relation(Oid relid)1276 pgstat_drop_relation(Oid relid)
1277 {
1278 	PgStat_MsgTabpurge msg;
1279 	int			len;
1280 
1281 	if (pgStatSock == PGINVALID_SOCKET)
1282 		return;
1283 
1284 	msg.m_tableid[0] = relid;
1285 	msg.m_nentries = 1;
1286 
1287 	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1288 
1289 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1290 	msg.m_databaseid = MyDatabaseId;
1291 	pgstat_send(&msg, len);
1292 }
1293 #endif							/* NOT_USED */
1294 
1295 
1296 /* ----------
1297  * pgstat_reset_counters() -
1298  *
1299  *	Tell the statistics collector to reset counters for our database.
1300  *
1301  *	Permission checking for this function is managed through the normal
1302  *	GRANT system.
1303  * ----------
1304  */
1305 void
pgstat_reset_counters(void)1306 pgstat_reset_counters(void)
1307 {
1308 	PgStat_MsgResetcounter msg;
1309 
1310 	if (pgStatSock == PGINVALID_SOCKET)
1311 		return;
1312 
1313 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1314 	msg.m_databaseid = MyDatabaseId;
1315 	pgstat_send(&msg, sizeof(msg));
1316 }
1317 
1318 /* ----------
1319  * pgstat_reset_shared_counters() -
1320  *
1321  *	Tell the statistics collector to reset cluster-wide shared counters.
1322  *
1323  *	Permission checking for this function is managed through the normal
1324  *	GRANT system.
1325  * ----------
1326  */
1327 void
pgstat_reset_shared_counters(const char * target)1328 pgstat_reset_shared_counters(const char *target)
1329 {
1330 	PgStat_MsgResetsharedcounter msg;
1331 
1332 	if (pgStatSock == PGINVALID_SOCKET)
1333 		return;
1334 
1335 	if (strcmp(target, "archiver") == 0)
1336 		msg.m_resettarget = RESET_ARCHIVER;
1337 	else if (strcmp(target, "bgwriter") == 0)
1338 		msg.m_resettarget = RESET_BGWRITER;
1339 	else
1340 		ereport(ERROR,
1341 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1342 				 errmsg("unrecognized reset target: \"%s\"", target),
1343 				 errhint("Target must be \"archiver\" or \"bgwriter\".")));
1344 
1345 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
1346 	pgstat_send(&msg, sizeof(msg));
1347 }
1348 
1349 /* ----------
1350  * pgstat_reset_single_counter() -
1351  *
1352  *	Tell the statistics collector to reset a single counter.
1353  *
1354  *	Permission checking for this function is managed through the normal
1355  *	GRANT system.
1356  * ----------
1357  */
1358 void
pgstat_reset_single_counter(Oid objoid,PgStat_Single_Reset_Type type)1359 pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
1360 {
1361 	PgStat_MsgResetsinglecounter msg;
1362 
1363 	if (pgStatSock == PGINVALID_SOCKET)
1364 		return;
1365 
1366 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
1367 	msg.m_databaseid = MyDatabaseId;
1368 	msg.m_resettype = type;
1369 	msg.m_objectid = objoid;
1370 
1371 	pgstat_send(&msg, sizeof(msg));
1372 }
1373 
1374 /* ----------
1375  * pgstat_report_autovac() -
1376  *
1377  *	Called from autovacuum.c to report startup of an autovacuum process.
1378  *	We are called before InitPostgres is done, so can't rely on MyDatabaseId;
1379  *	the db OID must be passed in, instead.
1380  * ----------
1381  */
1382 void
pgstat_report_autovac(Oid dboid)1383 pgstat_report_autovac(Oid dboid)
1384 {
1385 	PgStat_MsgAutovacStart msg;
1386 
1387 	if (pgStatSock == PGINVALID_SOCKET)
1388 		return;
1389 
1390 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
1391 	msg.m_databaseid = dboid;
1392 	msg.m_start_time = GetCurrentTimestamp();
1393 
1394 	pgstat_send(&msg, sizeof(msg));
1395 }
1396 
1397 
1398 /* ---------
1399  * pgstat_report_vacuum() -
1400  *
1401  *	Tell the collector about the table we just vacuumed.
1402  * ---------
1403  */
1404 void
pgstat_report_vacuum(Oid tableoid,bool shared,PgStat_Counter livetuples,PgStat_Counter deadtuples)1405 pgstat_report_vacuum(Oid tableoid, bool shared,
1406 					 PgStat_Counter livetuples, PgStat_Counter deadtuples)
1407 {
1408 	PgStat_MsgVacuum msg;
1409 
1410 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1411 		return;
1412 
1413 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
1414 	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
1415 	msg.m_tableoid = tableoid;
1416 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1417 	msg.m_vacuumtime = GetCurrentTimestamp();
1418 	msg.m_live_tuples = livetuples;
1419 	msg.m_dead_tuples = deadtuples;
1420 	pgstat_send(&msg, sizeof(msg));
1421 }
1422 
1423 /* --------
1424  * pgstat_report_analyze() -
1425  *
1426  *	Tell the collector about the table we just analyzed.
1427  *
1428  * Caller must provide new live- and dead-tuples estimates, as well as a
1429  * flag indicating whether to reset the changes_since_analyze counter.
1430  * --------
1431  */
1432 void
pgstat_report_analyze(Relation rel,PgStat_Counter livetuples,PgStat_Counter deadtuples,bool resetcounter)1433 pgstat_report_analyze(Relation rel,
1434 					  PgStat_Counter livetuples, PgStat_Counter deadtuples,
1435 					  bool resetcounter)
1436 {
1437 	PgStat_MsgAnalyze msg;
1438 
1439 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1440 		return;
1441 
1442 	/*
1443 	 * Unlike VACUUM, ANALYZE might be running inside a transaction that has
1444 	 * already inserted and/or deleted rows in the target table. ANALYZE will
1445 	 * have counted such rows as live or dead respectively. Because we will
1446 	 * report our counts of such rows at transaction end, we should subtract
1447 	 * off these counts from what we send to the collector now, else they'll
1448 	 * be double-counted after commit.  (This approach also ensures that the
1449 	 * collector ends up with the right numbers if we abort instead of
1450 	 * committing.)
1451 	 */
1452 	if (rel->pgstat_info != NULL)
1453 	{
1454 		PgStat_TableXactStatus *trans;
1455 
1456 		for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
1457 		{
1458 			livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1459 			deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1460 		}
1461 		/* count stuff inserted by already-aborted subxacts, too */
1462 		deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1463 		/* Since ANALYZE's counts are estimates, we could have underflowed */
1464 		livetuples = Max(livetuples, 0);
1465 		deadtuples = Max(deadtuples, 0);
1466 	}
1467 
1468 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
1469 	msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
1470 	msg.m_tableoid = RelationGetRelid(rel);
1471 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1472 	msg.m_resetcounter = resetcounter;
1473 	msg.m_analyzetime = GetCurrentTimestamp();
1474 	msg.m_live_tuples = livetuples;
1475 	msg.m_dead_tuples = deadtuples;
1476 	pgstat_send(&msg, sizeof(msg));
1477 }
1478 
1479 /* --------
1480  * pgstat_report_recovery_conflict() -
1481  *
1482  *	Tell the collector about a Hot Standby recovery conflict.
1483  * --------
1484  */
1485 void
pgstat_report_recovery_conflict(int reason)1486 pgstat_report_recovery_conflict(int reason)
1487 {
1488 	PgStat_MsgRecoveryConflict msg;
1489 
1490 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1491 		return;
1492 
1493 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
1494 	msg.m_databaseid = MyDatabaseId;
1495 	msg.m_reason = reason;
1496 	pgstat_send(&msg, sizeof(msg));
1497 }
1498 
1499 /* --------
1500  * pgstat_report_deadlock() -
1501  *
1502  *	Tell the collector about a deadlock detected.
1503  * --------
1504  */
1505 void
pgstat_report_deadlock(void)1506 pgstat_report_deadlock(void)
1507 {
1508 	PgStat_MsgDeadlock msg;
1509 
1510 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1511 		return;
1512 
1513 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DEADLOCK);
1514 	msg.m_databaseid = MyDatabaseId;
1515 	pgstat_send(&msg, sizeof(msg));
1516 }
1517 
1518 /* --------
1519  * pgstat_report_tempfile() -
1520  *
1521  *	Tell the collector about a temporary file.
1522  * --------
1523  */
1524 void
pgstat_report_tempfile(size_t filesize)1525 pgstat_report_tempfile(size_t filesize)
1526 {
1527 	PgStat_MsgTempFile msg;
1528 
1529 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1530 		return;
1531 
1532 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TEMPFILE);
1533 	msg.m_databaseid = MyDatabaseId;
1534 	msg.m_filesize = filesize;
1535 	pgstat_send(&msg, sizeof(msg));
1536 }
1537 
1538 
1539 /* ----------
1540  * pgstat_ping() -
1541  *
1542  *	Send some junk data to the collector to increase traffic.
1543  * ----------
1544  */
1545 void
pgstat_ping(void)1546 pgstat_ping(void)
1547 {
1548 	PgStat_MsgDummy msg;
1549 
1550 	if (pgStatSock == PGINVALID_SOCKET)
1551 		return;
1552 
1553 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1554 	pgstat_send(&msg, sizeof(msg));
1555 }
1556 
1557 /* ----------
1558  * pgstat_send_inquiry() -
1559  *
1560  *	Notify collector that we need fresh data.
1561  * ----------
1562  */
1563 static void
pgstat_send_inquiry(TimestampTz clock_time,TimestampTz cutoff_time,Oid databaseid)1564 pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1565 {
1566 	PgStat_MsgInquiry msg;
1567 
1568 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
1569 	msg.clock_time = clock_time;
1570 	msg.cutoff_time = cutoff_time;
1571 	msg.databaseid = databaseid;
1572 	pgstat_send(&msg, sizeof(msg));
1573 }
1574 
1575 
1576 /*
1577  * Initialize function call usage data.
1578  * Called by the executor before invoking a function.
1579  */
1580 void
pgstat_init_function_usage(FunctionCallInfoData * fcinfo,PgStat_FunctionCallUsage * fcu)1581 pgstat_init_function_usage(FunctionCallInfoData *fcinfo,
1582 						   PgStat_FunctionCallUsage *fcu)
1583 {
1584 	PgStat_BackendFunctionEntry *htabent;
1585 	bool		found;
1586 
1587 	if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
1588 	{
1589 		/* stats not wanted */
1590 		fcu->fs = NULL;
1591 		return;
1592 	}
1593 
1594 	if (!pgStatFunctions)
1595 	{
1596 		/* First time through - initialize function stat table */
1597 		HASHCTL		hash_ctl;
1598 
1599 		memset(&hash_ctl, 0, sizeof(hash_ctl));
1600 		hash_ctl.keysize = sizeof(Oid);
1601 		hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
1602 		pgStatFunctions = hash_create("Function stat entries",
1603 									  PGSTAT_FUNCTION_HASH_SIZE,
1604 									  &hash_ctl,
1605 									  HASH_ELEM | HASH_BLOBS);
1606 	}
1607 
1608 	/* Get the stats entry for this function, create if necessary */
1609 	htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
1610 						  HASH_ENTER, &found);
1611 	if (!found)
1612 		MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));
1613 
1614 	fcu->fs = &htabent->f_counts;
1615 
1616 	/* save stats for this function, later used to compensate for recursion */
1617 	fcu->save_f_total_time = htabent->f_counts.f_total_time;
1618 
1619 	/* save current backend-wide total time */
1620 	fcu->save_total = total_func_time;
1621 
1622 	/* get clock time as of function start */
1623 	INSTR_TIME_SET_CURRENT(fcu->f_start);
1624 }
1625 
1626 /*
1627  * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
1628  *		for specified function
1629  *
1630  * If no entry, return NULL, don't create a new one
1631  */
1632 PgStat_BackendFunctionEntry *
find_funcstat_entry(Oid func_id)1633 find_funcstat_entry(Oid func_id)
1634 {
1635 	if (pgStatFunctions == NULL)
1636 		return NULL;
1637 
1638 	return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
1639 													   (void *) &func_id,
1640 													   HASH_FIND, NULL);
1641 }
1642 
1643 /*
1644  * Calculate function call usage and update stat counters.
1645  * Called by the executor after invoking a function.
1646  *
1647  * In the case of a set-returning function that runs in value-per-call mode,
1648  * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
1649  * calls for what the user considers a single call of the function.  The
1650  * finalize flag should be TRUE on the last call.
1651  */
1652 void
pgstat_end_function_usage(PgStat_FunctionCallUsage * fcu,bool finalize)1653 pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
1654 {
1655 	PgStat_FunctionCounts *fs = fcu->fs;
1656 	instr_time	f_total;
1657 	instr_time	f_others;
1658 	instr_time	f_self;
1659 
1660 	/* stats not wanted? */
1661 	if (fs == NULL)
1662 		return;
1663 
1664 	/* total elapsed time in this function call */
1665 	INSTR_TIME_SET_CURRENT(f_total);
1666 	INSTR_TIME_SUBTRACT(f_total, fcu->f_start);
1667 
1668 	/* self usage: elapsed minus anything already charged to other calls */
1669 	f_others = total_func_time;
1670 	INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
1671 	f_self = f_total;
1672 	INSTR_TIME_SUBTRACT(f_self, f_others);
1673 
1674 	/* update backend-wide total time */
1675 	INSTR_TIME_ADD(total_func_time, f_self);
1676 
1677 	/*
1678 	 * Compute the new f_total_time as the total elapsed time added to the
1679 	 * pre-call value of f_total_time.  This is necessary to avoid
1680 	 * double-counting any time taken by recursive calls of myself.  (We do
1681 	 * not need any similar kluge for self time, since that already excludes
1682 	 * any recursive calls.)
1683 	 */
1684 	INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
1685 
1686 	/* update counters in function stats table */
1687 	if (finalize)
1688 		fs->f_numcalls++;
1689 	fs->f_total_time = f_total;
1690 	INSTR_TIME_ADD(fs->f_self_time, f_self);
1691 
1692 	/* indicate that we have something to send */
1693 	have_function_stats = true;
1694 }
1695 
1696 
1697 /* ----------
1698  * pgstat_initstats() -
1699  *
1700  *	Initialize a relcache entry to count access statistics.
1701  *	Called whenever a relation is opened.
1702  *
1703  *	We assume that a relcache entry's pgstat_info field is zeroed by
1704  *	relcache.c when the relcache entry is made; thereafter it is long-lived
1705  *	data.  We can avoid repeated searches of the TabStatus arrays when the
1706  *	same relation is touched repeatedly within a transaction.
1707  * ----------
1708  */
1709 void
pgstat_initstats(Relation rel)1710 pgstat_initstats(Relation rel)
1711 {
1712 	Oid			rel_id = rel->rd_id;
1713 	char		relkind = rel->rd_rel->relkind;
1714 
1715 	/* We only count stats for things that have storage */
1716 	if (!(relkind == RELKIND_RELATION ||
1717 		  relkind == RELKIND_MATVIEW ||
1718 		  relkind == RELKIND_INDEX ||
1719 		  relkind == RELKIND_TOASTVALUE ||
1720 		  relkind == RELKIND_SEQUENCE))
1721 	{
1722 		rel->pgstat_info = NULL;
1723 		return;
1724 	}
1725 
1726 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1727 	{
1728 		/* We're not counting at all */
1729 		rel->pgstat_info = NULL;
1730 		return;
1731 	}
1732 
1733 	/*
1734 	 * If we already set up this relation in the current transaction, nothing
1735 	 * to do.
1736 	 */
1737 	if (rel->pgstat_info != NULL &&
1738 		rel->pgstat_info->t_id == rel_id)
1739 		return;
1740 
1741 	/* Else find or make the PgStat_TableStatus entry, and update link */
1742 	rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
1743 }
1744 
1745 /*
1746  * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
1747  */
1748 static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id,bool isshared)1749 get_tabstat_entry(Oid rel_id, bool isshared)
1750 {
1751 	TabStatHashEntry *hash_entry;
1752 	PgStat_TableStatus *entry;
1753 	TabStatusArray *tsa;
1754 	bool		found;
1755 
1756 	/*
1757 	 * Create hash table if we don't have it already.
1758 	 */
1759 	if (pgStatTabHash == NULL)
1760 	{
1761 		HASHCTL		ctl;
1762 
1763 		memset(&ctl, 0, sizeof(ctl));
1764 		ctl.keysize = sizeof(Oid);
1765 		ctl.entrysize = sizeof(TabStatHashEntry);
1766 
1767 		pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
1768 									TABSTAT_QUANTUM,
1769 									&ctl,
1770 									HASH_ELEM | HASH_BLOBS);
1771 	}
1772 
1773 	/*
1774 	 * Find an entry or create a new one.
1775 	 */
1776 	hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
1777 	if (!found)
1778 	{
1779 		/* initialize new entry with null pointer */
1780 		hash_entry->tsa_entry = NULL;
1781 	}
1782 
1783 	/*
1784 	 * If entry is already valid, we're done.
1785 	 */
1786 	if (hash_entry->tsa_entry)
1787 		return hash_entry->tsa_entry;
1788 
1789 	/*
1790 	 * Locate the first pgStatTabList entry with free space, making a new list
1791 	 * entry if needed.  Note that we could get an OOM failure here, but if so
1792 	 * we have left the hashtable and the list in a consistent state.
1793 	 */
1794 	if (pgStatTabList == NULL)
1795 	{
1796 		/* Set up first pgStatTabList entry */
1797 		pgStatTabList = (TabStatusArray *)
1798 			MemoryContextAllocZero(TopMemoryContext,
1799 								   sizeof(TabStatusArray));
1800 	}
1801 
1802 	tsa = pgStatTabList;
1803 	while (tsa->tsa_used >= TABSTAT_QUANTUM)
1804 	{
1805 		if (tsa->tsa_next == NULL)
1806 			tsa->tsa_next = (TabStatusArray *)
1807 				MemoryContextAllocZero(TopMemoryContext,
1808 									   sizeof(TabStatusArray));
1809 		tsa = tsa->tsa_next;
1810 	}
1811 
1812 	/*
1813 	 * Allocate a PgStat_TableStatus entry within this list entry.  We assume
1814 	 * the entry was already zeroed, either at creation or after last use.
1815 	 */
1816 	entry = &tsa->tsa_entries[tsa->tsa_used++];
1817 	entry->t_id = rel_id;
1818 	entry->t_shared = isshared;
1819 
1820 	/*
1821 	 * Now we can fill the entry in pgStatTabHash.
1822 	 */
1823 	hash_entry->tsa_entry = entry;
1824 
1825 	return entry;
1826 }
1827 
1828 /*
1829  * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
1830  *
1831  * If no entry, return NULL, don't create a new one
1832  *
1833  * Note: if we got an error in the most recent execution of pgstat_report_stat,
1834  * it's possible that an entry exists but there's no hashtable entry for it.
1835  * That's okay, we'll treat this case as "doesn't exist".
1836  */
1837 PgStat_TableStatus *
find_tabstat_entry(Oid rel_id)1838 find_tabstat_entry(Oid rel_id)
1839 {
1840 	TabStatHashEntry *hash_entry;
1841 
1842 	/* If hashtable doesn't exist, there are no entries at all */
1843 	if (!pgStatTabHash)
1844 		return NULL;
1845 
1846 	hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
1847 	if (!hash_entry)
1848 		return NULL;
1849 
1850 	/* Note that this step could also return NULL, but that's correct */
1851 	return hash_entry->tsa_entry;
1852 }
1853 
1854 /*
1855  * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
1856  */
1857 static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)1858 get_tabstat_stack_level(int nest_level)
1859 {
1860 	PgStat_SubXactStatus *xact_state;
1861 
1862 	xact_state = pgStatXactStack;
1863 	if (xact_state == NULL || xact_state->nest_level != nest_level)
1864 	{
1865 		xact_state = (PgStat_SubXactStatus *)
1866 			MemoryContextAlloc(TopTransactionContext,
1867 							   sizeof(PgStat_SubXactStatus));
1868 		xact_state->nest_level = nest_level;
1869 		xact_state->prev = pgStatXactStack;
1870 		xact_state->first = NULL;
1871 		pgStatXactStack = xact_state;
1872 	}
1873 	return xact_state;
1874 }
1875 
1876 /*
1877  * add_tabstat_xact_level - add a new (sub)transaction state record
1878  */
1879 static void
add_tabstat_xact_level(PgStat_TableStatus * pgstat_info,int nest_level)1880 add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
1881 {
1882 	PgStat_SubXactStatus *xact_state;
1883 	PgStat_TableXactStatus *trans;
1884 
1885 	/*
1886 	 * If this is the first rel to be modified at the current nest level, we
1887 	 * first have to push a transaction stack entry.
1888 	 */
1889 	xact_state = get_tabstat_stack_level(nest_level);
1890 
1891 	/* Now make a per-table stack entry */
1892 	trans = (PgStat_TableXactStatus *)
1893 		MemoryContextAllocZero(TopTransactionContext,
1894 							   sizeof(PgStat_TableXactStatus));
1895 	trans->nest_level = nest_level;
1896 	trans->upper = pgstat_info->trans;
1897 	trans->parent = pgstat_info;
1898 	trans->next = xact_state->first;
1899 	xact_state->first = trans;
1900 	pgstat_info->trans = trans;
1901 }
1902 
1903 /*
1904  * pgstat_count_heap_insert - count a tuple insertion of n tuples
1905  */
1906 void
pgstat_count_heap_insert(Relation rel,PgStat_Counter n)1907 pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
1908 {
1909 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
1910 
1911 	if (pgstat_info != NULL)
1912 	{
1913 		/* We have to log the effect at the proper transactional level */
1914 		int			nest_level = GetCurrentTransactionNestLevel();
1915 
1916 		if (pgstat_info->trans == NULL ||
1917 			pgstat_info->trans->nest_level != nest_level)
1918 			add_tabstat_xact_level(pgstat_info, nest_level);
1919 
1920 		pgstat_info->trans->tuples_inserted += n;
1921 	}
1922 }
1923 
1924 /*
1925  * pgstat_count_heap_update - count a tuple update
1926  */
1927 void
pgstat_count_heap_update(Relation rel,bool hot)1928 pgstat_count_heap_update(Relation rel, bool hot)
1929 {
1930 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
1931 
1932 	if (pgstat_info != NULL)
1933 	{
1934 		/* We have to log the effect at the proper transactional level */
1935 		int			nest_level = GetCurrentTransactionNestLevel();
1936 
1937 		if (pgstat_info->trans == NULL ||
1938 			pgstat_info->trans->nest_level != nest_level)
1939 			add_tabstat_xact_level(pgstat_info, nest_level);
1940 
1941 		pgstat_info->trans->tuples_updated++;
1942 
1943 		/* t_tuples_hot_updated is nontransactional, so just advance it */
1944 		if (hot)
1945 			pgstat_info->t_counts.t_tuples_hot_updated++;
1946 	}
1947 }
1948 
1949 /*
1950  * pgstat_count_heap_delete - count a tuple deletion
1951  */
1952 void
pgstat_count_heap_delete(Relation rel)1953 pgstat_count_heap_delete(Relation rel)
1954 {
1955 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
1956 
1957 	if (pgstat_info != NULL)
1958 	{
1959 		/* We have to log the effect at the proper transactional level */
1960 		int			nest_level = GetCurrentTransactionNestLevel();
1961 
1962 		if (pgstat_info->trans == NULL ||
1963 			pgstat_info->trans->nest_level != nest_level)
1964 			add_tabstat_xact_level(pgstat_info, nest_level);
1965 
1966 		pgstat_info->trans->tuples_deleted++;
1967 	}
1968 }
1969 
1970 /*
1971  * pgstat_truncate_save_counters
1972  *
1973  * Whenever a table is truncated, we save its i/u/d counters so that they can
1974  * be cleared, and if the (sub)xact that executed the truncate later aborts,
1975  * the counters can be restored to the saved (pre-truncate) values.  Note we do
1976  * this on the first truncate in any particular subxact level only.
1977  */
1978 static void
pgstat_truncate_save_counters(PgStat_TableXactStatus * trans)1979 pgstat_truncate_save_counters(PgStat_TableXactStatus *trans)
1980 {
1981 	if (!trans->truncated)
1982 	{
1983 		trans->inserted_pre_trunc = trans->tuples_inserted;
1984 		trans->updated_pre_trunc = trans->tuples_updated;
1985 		trans->deleted_pre_trunc = trans->tuples_deleted;
1986 		trans->truncated = true;
1987 	}
1988 }
1989 
1990 /*
1991  * pgstat_truncate_restore_counters - restore counters when a truncate aborts
1992  */
1993 static void
pgstat_truncate_restore_counters(PgStat_TableXactStatus * trans)1994 pgstat_truncate_restore_counters(PgStat_TableXactStatus *trans)
1995 {
1996 	if (trans->truncated)
1997 	{
1998 		trans->tuples_inserted = trans->inserted_pre_trunc;
1999 		trans->tuples_updated = trans->updated_pre_trunc;
2000 		trans->tuples_deleted = trans->deleted_pre_trunc;
2001 	}
2002 }
2003 
2004 /*
2005  * pgstat_count_truncate - update tuple counters due to truncate
2006  */
2007 void
pgstat_count_truncate(Relation rel)2008 pgstat_count_truncate(Relation rel)
2009 {
2010 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2011 
2012 	if (pgstat_info != NULL)
2013 	{
2014 		/* We have to log the effect at the proper transactional level */
2015 		int			nest_level = GetCurrentTransactionNestLevel();
2016 
2017 		if (pgstat_info->trans == NULL ||
2018 			pgstat_info->trans->nest_level != nest_level)
2019 			add_tabstat_xact_level(pgstat_info, nest_level);
2020 
2021 		pgstat_truncate_save_counters(pgstat_info->trans);
2022 		pgstat_info->trans->tuples_inserted = 0;
2023 		pgstat_info->trans->tuples_updated = 0;
2024 		pgstat_info->trans->tuples_deleted = 0;
2025 	}
2026 }
2027 
2028 /*
2029  * pgstat_update_heap_dead_tuples - update dead-tuples count
2030  *
2031  * The semantics of this are that we are reporting the nontransactional
2032  * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
2033  * rather than increasing, and the change goes straight into the per-table
2034  * counter, not into transactional state.
2035  */
2036 void
pgstat_update_heap_dead_tuples(Relation rel,int delta)2037 pgstat_update_heap_dead_tuples(Relation rel, int delta)
2038 {
2039 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2040 
2041 	if (pgstat_info != NULL)
2042 		pgstat_info->t_counts.t_delta_dead_tuples -= delta;
2043 }
2044 
2045 
2046 /* ----------
2047  * AtEOXact_PgStat
2048  *
2049  *	Called from access/transam/xact.c at top-level transaction commit/abort.
2050  * ----------
2051  */
2052 void
AtEOXact_PgStat(bool isCommit,bool parallel)2053 AtEOXact_PgStat(bool isCommit, bool parallel)
2054 {
2055 	PgStat_SubXactStatus *xact_state;
2056 
2057 	/* Don't count parallel worker transaction stats */
2058 	if (!parallel)
2059 	{
2060 		/*
2061 		 * Count transaction commit or abort.  (We use counters, not just
2062 		 * bools, in case the reporting message isn't sent right away.)
2063 		 */
2064 		if (isCommit)
2065 			pgStatXactCommit++;
2066 		else
2067 			pgStatXactRollback++;
2068 	}
2069 
2070 	/*
2071 	 * Transfer transactional insert/update counts into the base tabstat
2072 	 * entries.  We don't bother to free any of the transactional state, since
2073 	 * it's all in TopTransactionContext and will go away anyway.
2074 	 */
2075 	xact_state = pgStatXactStack;
2076 	if (xact_state != NULL)
2077 	{
2078 		PgStat_TableXactStatus *trans;
2079 
2080 		Assert(xact_state->nest_level == 1);
2081 		Assert(xact_state->prev == NULL);
2082 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2083 		{
2084 			PgStat_TableStatus *tabstat;
2085 
2086 			Assert(trans->nest_level == 1);
2087 			Assert(trans->upper == NULL);
2088 			tabstat = trans->parent;
2089 			Assert(tabstat->trans == trans);
2090 			/* restore pre-truncate stats (if any) in case of aborted xact */
2091 			if (!isCommit)
2092 				pgstat_truncate_restore_counters(trans);
2093 			/* count attempted actions regardless of commit/abort */
2094 			tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2095 			tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2096 			tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2097 			if (isCommit)
2098 			{
2099 				tabstat->t_counts.t_truncated = trans->truncated;
2100 				if (trans->truncated)
2101 				{
2102 					/* forget live/dead stats seen by backend thus far */
2103 					tabstat->t_counts.t_delta_live_tuples = 0;
2104 					tabstat->t_counts.t_delta_dead_tuples = 0;
2105 				}
2106 				/* insert adds a live tuple, delete removes one */
2107 				tabstat->t_counts.t_delta_live_tuples +=
2108 					trans->tuples_inserted - trans->tuples_deleted;
2109 				/* update and delete each create a dead tuple */
2110 				tabstat->t_counts.t_delta_dead_tuples +=
2111 					trans->tuples_updated + trans->tuples_deleted;
2112 				/* insert, update, delete each count as one change event */
2113 				tabstat->t_counts.t_changed_tuples +=
2114 					trans->tuples_inserted + trans->tuples_updated +
2115 					trans->tuples_deleted;
2116 			}
2117 			else
2118 			{
2119 				/* inserted tuples are dead, deleted tuples are unaffected */
2120 				tabstat->t_counts.t_delta_dead_tuples +=
2121 					trans->tuples_inserted + trans->tuples_updated;
2122 				/* an aborted xact generates no changed_tuple events */
2123 			}
2124 			tabstat->trans = NULL;
2125 		}
2126 	}
2127 	pgStatXactStack = NULL;
2128 
2129 	/* Make sure any stats snapshot is thrown away */
2130 	pgstat_clear_snapshot();
2131 }
2132 
2133 /* ----------
2134  * AtEOSubXact_PgStat
2135  *
2136  *	Called from access/transam/xact.c at subtransaction commit/abort.
2137  * ----------
2138  */
2139 void
AtEOSubXact_PgStat(bool isCommit,int nestDepth)2140 AtEOSubXact_PgStat(bool isCommit, int nestDepth)
2141 {
2142 	PgStat_SubXactStatus *xact_state;
2143 
2144 	/*
2145 	 * Transfer transactional insert/update counts into the next higher
2146 	 * subtransaction state.
2147 	 */
2148 	xact_state = pgStatXactStack;
2149 	if (xact_state != NULL &&
2150 		xact_state->nest_level >= nestDepth)
2151 	{
2152 		PgStat_TableXactStatus *trans;
2153 		PgStat_TableXactStatus *next_trans;
2154 
2155 		/* delink xact_state from stack immediately to simplify reuse case */
2156 		pgStatXactStack = xact_state->prev;
2157 
2158 		for (trans = xact_state->first; trans != NULL; trans = next_trans)
2159 		{
2160 			PgStat_TableStatus *tabstat;
2161 
2162 			next_trans = trans->next;
2163 			Assert(trans->nest_level == nestDepth);
2164 			tabstat = trans->parent;
2165 			Assert(tabstat->trans == trans);
2166 			if (isCommit)
2167 			{
2168 				if (trans->upper && trans->upper->nest_level == nestDepth - 1)
2169 				{
2170 					if (trans->truncated)
2171 					{
2172 						/* propagate the truncate status one level up */
2173 						pgstat_truncate_save_counters(trans->upper);
2174 						/* replace upper xact stats with ours */
2175 						trans->upper->tuples_inserted = trans->tuples_inserted;
2176 						trans->upper->tuples_updated = trans->tuples_updated;
2177 						trans->upper->tuples_deleted = trans->tuples_deleted;
2178 					}
2179 					else
2180 					{
2181 						trans->upper->tuples_inserted += trans->tuples_inserted;
2182 						trans->upper->tuples_updated += trans->tuples_updated;
2183 						trans->upper->tuples_deleted += trans->tuples_deleted;
2184 					}
2185 					tabstat->trans = trans->upper;
2186 					pfree(trans);
2187 				}
2188 				else
2189 				{
2190 					/*
2191 					 * When there isn't an immediate parent state, we can just
2192 					 * reuse the record instead of going through a
2193 					 * palloc/pfree pushup (this works since it's all in
2194 					 * TopTransactionContext anyway).  We have to re-link it
2195 					 * into the parent level, though, and that might mean
2196 					 * pushing a new entry into the pgStatXactStack.
2197 					 */
2198 					PgStat_SubXactStatus *upper_xact_state;
2199 
2200 					upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
2201 					trans->next = upper_xact_state->first;
2202 					upper_xact_state->first = trans;
2203 					trans->nest_level = nestDepth - 1;
2204 				}
2205 			}
2206 			else
2207 			{
2208 				/*
2209 				 * On abort, update top-level tabstat counts, then forget the
2210 				 * subtransaction
2211 				 */
2212 
2213 				/* first restore values obliterated by truncate */
2214 				pgstat_truncate_restore_counters(trans);
2215 				/* count attempted actions regardless of commit/abort */
2216 				tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2217 				tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2218 				tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2219 				/* inserted tuples are dead, deleted tuples are unaffected */
2220 				tabstat->t_counts.t_delta_dead_tuples +=
2221 					trans->tuples_inserted + trans->tuples_updated;
2222 				tabstat->trans = trans->upper;
2223 				pfree(trans);
2224 			}
2225 		}
2226 		pfree(xact_state);
2227 	}
2228 }
2229 
2230 
2231 /*
2232  * AtPrepare_PgStat
2233  *		Save the transactional stats state at 2PC transaction prepare.
2234  *
2235  * In this phase we just generate 2PC records for all the pending
2236  * transaction-dependent stats work.
2237  */
2238 void
AtPrepare_PgStat(void)2239 AtPrepare_PgStat(void)
2240 {
2241 	PgStat_SubXactStatus *xact_state;
2242 
2243 	xact_state = pgStatXactStack;
2244 	if (xact_state != NULL)
2245 	{
2246 		PgStat_TableXactStatus *trans;
2247 
2248 		Assert(xact_state->nest_level == 1);
2249 		Assert(xact_state->prev == NULL);
2250 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2251 		{
2252 			PgStat_TableStatus *tabstat;
2253 			TwoPhasePgStatRecord record;
2254 
2255 			Assert(trans->nest_level == 1);
2256 			Assert(trans->upper == NULL);
2257 			tabstat = trans->parent;
2258 			Assert(tabstat->trans == trans);
2259 
2260 			record.tuples_inserted = trans->tuples_inserted;
2261 			record.tuples_updated = trans->tuples_updated;
2262 			record.tuples_deleted = trans->tuples_deleted;
2263 			record.inserted_pre_trunc = trans->inserted_pre_trunc;
2264 			record.updated_pre_trunc = trans->updated_pre_trunc;
2265 			record.deleted_pre_trunc = trans->deleted_pre_trunc;
2266 			record.t_id = tabstat->t_id;
2267 			record.t_shared = tabstat->t_shared;
2268 			record.t_truncated = trans->truncated;
2269 
2270 			RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
2271 								   &record, sizeof(TwoPhasePgStatRecord));
2272 		}
2273 	}
2274 }
2275 
2276 /*
2277  * PostPrepare_PgStat
2278  *		Clean up after successful PREPARE.
2279  *
2280  * All we need do here is unlink the transaction stats state from the
2281  * nontransactional state.  The nontransactional action counts will be
2282  * reported to the stats collector immediately, while the effects on live
2283  * and dead tuple counts are preserved in the 2PC state file.
2284  *
2285  * Note: AtEOXact_PgStat is not called during PREPARE.
2286  */
2287 void
PostPrepare_PgStat(void)2288 PostPrepare_PgStat(void)
2289 {
2290 	PgStat_SubXactStatus *xact_state;
2291 
2292 	/*
2293 	 * We don't bother to free any of the transactional state, since it's all
2294 	 * in TopTransactionContext and will go away anyway.
2295 	 */
2296 	xact_state = pgStatXactStack;
2297 	if (xact_state != NULL)
2298 	{
2299 		PgStat_TableXactStatus *trans;
2300 
2301 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2302 		{
2303 			PgStat_TableStatus *tabstat;
2304 
2305 			tabstat = trans->parent;
2306 			tabstat->trans = NULL;
2307 		}
2308 	}
2309 	pgStatXactStack = NULL;
2310 
2311 	/* Make sure any stats snapshot is thrown away */
2312 	pgstat_clear_snapshot();
2313 }
2314 
2315 /*
2316  * 2PC processing routine for COMMIT PREPARED case.
2317  *
2318  * Load the saved counts into our local pgstats state.
2319  */
2320 void
pgstat_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)2321 pgstat_twophase_postcommit(TransactionId xid, uint16 info,
2322 						   void *recdata, uint32 len)
2323 {
2324 	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2325 	PgStat_TableStatus *pgstat_info;
2326 
2327 	/* Find or create a tabstat entry for the rel */
2328 	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2329 
2330 	/* Same math as in AtEOXact_PgStat, commit case */
2331 	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2332 	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2333 	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2334 	pgstat_info->t_counts.t_truncated = rec->t_truncated;
2335 	if (rec->t_truncated)
2336 	{
2337 		/* forget live/dead stats seen by backend thus far */
2338 		pgstat_info->t_counts.t_delta_live_tuples = 0;
2339 		pgstat_info->t_counts.t_delta_dead_tuples = 0;
2340 	}
2341 	pgstat_info->t_counts.t_delta_live_tuples +=
2342 		rec->tuples_inserted - rec->tuples_deleted;
2343 	pgstat_info->t_counts.t_delta_dead_tuples +=
2344 		rec->tuples_updated + rec->tuples_deleted;
2345 	pgstat_info->t_counts.t_changed_tuples +=
2346 		rec->tuples_inserted + rec->tuples_updated +
2347 		rec->tuples_deleted;
2348 }
2349 
2350 /*
2351  * 2PC processing routine for ROLLBACK PREPARED case.
2352  *
2353  * Load the saved counts into our local pgstats state, but treat them
2354  * as aborted.
2355  */
2356 void
pgstat_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)2357 pgstat_twophase_postabort(TransactionId xid, uint16 info,
2358 						  void *recdata, uint32 len)
2359 {
2360 	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2361 	PgStat_TableStatus *pgstat_info;
2362 
2363 	/* Find or create a tabstat entry for the rel */
2364 	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2365 
2366 	/* Same math as in AtEOXact_PgStat, abort case */
2367 	if (rec->t_truncated)
2368 	{
2369 		rec->tuples_inserted = rec->inserted_pre_trunc;
2370 		rec->tuples_updated = rec->updated_pre_trunc;
2371 		rec->tuples_deleted = rec->deleted_pre_trunc;
2372 	}
2373 	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2374 	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2375 	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2376 	pgstat_info->t_counts.t_delta_dead_tuples +=
2377 		rec->tuples_inserted + rec->tuples_updated;
2378 }
2379 
2380 
2381 /* ----------
2382  * pgstat_fetch_stat_dbentry() -
2383  *
2384  *	Support function for the SQL-callable pgstat* functions. Returns
2385  *	the collected statistics for one database or NULL. NULL doesn't mean
2386  *	that the database doesn't exist, it is just not yet known by the
2387  *	collector, so the caller is better off to report ZERO instead.
2388  * ----------
2389  */
2390 PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)2391 pgstat_fetch_stat_dbentry(Oid dbid)
2392 {
2393 	/*
2394 	 * If not done for this transaction, read the statistics collector stats
2395 	 * file into some hash tables.
2396 	 */
2397 	backend_read_statsfile();
2398 
2399 	/*
2400 	 * Lookup the requested database; return NULL if not found
2401 	 */
2402 	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2403 											  (void *) &dbid,
2404 											  HASH_FIND, NULL);
2405 }
2406 
2407 
2408 /* ----------
2409  * pgstat_fetch_stat_tabentry() -
2410  *
2411  *	Support function for the SQL-callable pgstat* functions. Returns
2412  *	the collected statistics for one table or NULL. NULL doesn't mean
2413  *	that the table doesn't exist, it is just not yet known by the
2414  *	collector, so the caller is better off to report ZERO instead.
2415  * ----------
2416  */
2417 PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)2418 pgstat_fetch_stat_tabentry(Oid relid)
2419 {
2420 	Oid			dbid;
2421 	PgStat_StatDBEntry *dbentry;
2422 	PgStat_StatTabEntry *tabentry;
2423 
2424 	/*
2425 	 * If not done for this transaction, read the statistics collector stats
2426 	 * file into some hash tables.
2427 	 */
2428 	backend_read_statsfile();
2429 
2430 	/*
2431 	 * Lookup our database, then look in its table hash table.
2432 	 */
2433 	dbid = MyDatabaseId;
2434 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2435 												 (void *) &dbid,
2436 												 HASH_FIND, NULL);
2437 	if (dbentry != NULL && dbentry->tables != NULL)
2438 	{
2439 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2440 													   (void *) &relid,
2441 													   HASH_FIND, NULL);
2442 		if (tabentry)
2443 			return tabentry;
2444 	}
2445 
2446 	/*
2447 	 * If we didn't find it, maybe it's a shared table.
2448 	 */
2449 	dbid = InvalidOid;
2450 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2451 												 (void *) &dbid,
2452 												 HASH_FIND, NULL);
2453 	if (dbentry != NULL && dbentry->tables != NULL)
2454 	{
2455 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2456 													   (void *) &relid,
2457 													   HASH_FIND, NULL);
2458 		if (tabentry)
2459 			return tabentry;
2460 	}
2461 
2462 	return NULL;
2463 }
2464 
2465 
2466 /* ----------
2467  * pgstat_fetch_stat_funcentry() -
2468  *
2469  *	Support function for the SQL-callable pgstat* functions. Returns
2470  *	the collected statistics for one function or NULL.
2471  * ----------
2472  */
2473 PgStat_StatFuncEntry *
pgstat_fetch_stat_funcentry(Oid func_id)2474 pgstat_fetch_stat_funcentry(Oid func_id)
2475 {
2476 	PgStat_StatDBEntry *dbentry;
2477 	PgStat_StatFuncEntry *funcentry = NULL;
2478 
2479 	/* load the stats file if needed */
2480 	backend_read_statsfile();
2481 
2482 	/* Lookup our database, then find the requested function.  */
2483 	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
2484 	if (dbentry != NULL && dbentry->functions != NULL)
2485 	{
2486 		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
2487 														 (void *) &func_id,
2488 														 HASH_FIND, NULL);
2489 	}
2490 
2491 	return funcentry;
2492 }
2493 
2494 
2495 /* ----------
2496  * pgstat_fetch_stat_beentry() -
2497  *
2498  *	Support function for the SQL-callable pgstat* functions. Returns
2499  *	our local copy of the current-activity entry for one backend.
2500  *
2501  *	NB: caller is responsible for a check if the user is permitted to see
2502  *	this info (especially the querystring).
2503  * ----------
2504  */
2505 PgBackendStatus *
pgstat_fetch_stat_beentry(int beid)2506 pgstat_fetch_stat_beentry(int beid)
2507 {
2508 	pgstat_read_current_status();
2509 
2510 	if (beid < 1 || beid > localNumBackends)
2511 		return NULL;
2512 
2513 	return &localBackendStatusTable[beid - 1].backendStatus;
2514 }
2515 
2516 
2517 /* ----------
2518  * pgstat_fetch_stat_local_beentry() -
2519  *
2520  *	Like pgstat_fetch_stat_beentry() but with locally computed additions (like
2521  *	xid and xmin values of the backend)
2522  *
2523  *	NB: caller is responsible for a check if the user is permitted to see
2524  *	this info (especially the querystring).
2525  * ----------
2526  */
2527 LocalPgBackendStatus *
pgstat_fetch_stat_local_beentry(int beid)2528 pgstat_fetch_stat_local_beentry(int beid)
2529 {
2530 	pgstat_read_current_status();
2531 
2532 	if (beid < 1 || beid > localNumBackends)
2533 		return NULL;
2534 
2535 	return &localBackendStatusTable[beid - 1];
2536 }
2537 
2538 
2539 /* ----------
2540  * pgstat_fetch_stat_numbackends() -
2541  *
2542  *	Support function for the SQL-callable pgstat* functions. Returns
2543  *	the maximum current backend id.
2544  * ----------
2545  */
2546 int
pgstat_fetch_stat_numbackends(void)2547 pgstat_fetch_stat_numbackends(void)
2548 {
2549 	pgstat_read_current_status();
2550 
2551 	return localNumBackends;
2552 }
2553 
2554 /*
2555  * ---------
2556  * pgstat_fetch_stat_archiver() -
2557  *
2558  *	Support function for the SQL-callable pgstat* functions. Returns
2559  *	a pointer to the archiver statistics struct.
2560  * ---------
2561  */
2562 PgStat_ArchiverStats *
pgstat_fetch_stat_archiver(void)2563 pgstat_fetch_stat_archiver(void)
2564 {
2565 	backend_read_statsfile();
2566 
2567 	return &archiverStats;
2568 }
2569 
2570 
2571 /*
2572  * ---------
2573  * pgstat_fetch_global() -
2574  *
2575  *	Support function for the SQL-callable pgstat* functions. Returns
2576  *	a pointer to the global statistics struct.
2577  * ---------
2578  */
2579 PgStat_GlobalStats *
pgstat_fetch_global(void)2580 pgstat_fetch_global(void)
2581 {
2582 	backend_read_statsfile();
2583 
2584 	return &globalStats;
2585 }
2586 
2587 
2588 /* ------------------------------------------------------------
2589  * Functions for management of the shared-memory PgBackendStatus array
2590  * ------------------------------------------------------------
2591  */
2592 
2593 static PgBackendStatus *BackendStatusArray = NULL;
2594 static PgBackendStatus *MyBEEntry = NULL;
2595 static char *BackendAppnameBuffer = NULL;
2596 static char *BackendClientHostnameBuffer = NULL;
2597 static char *BackendActivityBuffer = NULL;
2598 static Size BackendActivityBufferSize = 0;
2599 #ifdef USE_SSL
2600 static PgBackendSSLStatus *BackendSslStatusBuffer = NULL;
2601 #endif
2602 
2603 
2604 /*
2605  * Report shared-memory space needed by CreateSharedBackendStatus.
2606  */
2607 Size
BackendStatusShmemSize(void)2608 BackendStatusShmemSize(void)
2609 {
2610 	Size		size;
2611 
2612 	/* BackendStatusArray: */
2613 	size = mul_size(sizeof(PgBackendStatus), NumBackendStatSlots);
2614 	/* BackendAppnameBuffer: */
2615 	size = add_size(size,
2616 					mul_size(NAMEDATALEN, NumBackendStatSlots));
2617 	/* BackendClientHostnameBuffer: */
2618 	size = add_size(size,
2619 					mul_size(NAMEDATALEN, NumBackendStatSlots));
2620 	/* BackendActivityBuffer: */
2621 	size = add_size(size,
2622 					mul_size(pgstat_track_activity_query_size, NumBackendStatSlots));
2623 #ifdef USE_SSL
2624 	/* BackendSslStatusBuffer: */
2625 	size = add_size(size,
2626 					mul_size(sizeof(PgBackendSSLStatus), NumBackendStatSlots));
2627 #endif
2628 	return size;
2629 }
2630 
2631 /*
2632  * Initialize the shared status array and several string buffers
2633  * during postmaster startup.
2634  */
2635 void
CreateSharedBackendStatus(void)2636 CreateSharedBackendStatus(void)
2637 {
2638 	Size		size;
2639 	bool		found;
2640 	int			i;
2641 	char	   *buffer;
2642 
2643 	/* Create or attach to the shared array */
2644 	size = mul_size(sizeof(PgBackendStatus), NumBackendStatSlots);
2645 	BackendStatusArray = (PgBackendStatus *)
2646 		ShmemInitStruct("Backend Status Array", size, &found);
2647 
2648 	if (!found)
2649 	{
2650 		/*
2651 		 * We're the first - initialize.
2652 		 */
2653 		MemSet(BackendStatusArray, 0, size);
2654 	}
2655 
2656 	/* Create or attach to the shared appname buffer */
2657 	size = mul_size(NAMEDATALEN, NumBackendStatSlots);
2658 	BackendAppnameBuffer = (char *)
2659 		ShmemInitStruct("Backend Application Name Buffer", size, &found);
2660 
2661 	if (!found)
2662 	{
2663 		MemSet(BackendAppnameBuffer, 0, size);
2664 
2665 		/* Initialize st_appname pointers. */
2666 		buffer = BackendAppnameBuffer;
2667 		for (i = 0; i < NumBackendStatSlots; i++)
2668 		{
2669 			BackendStatusArray[i].st_appname = buffer;
2670 			buffer += NAMEDATALEN;
2671 		}
2672 	}
2673 
2674 	/* Create or attach to the shared client hostname buffer */
2675 	size = mul_size(NAMEDATALEN, NumBackendStatSlots);
2676 	BackendClientHostnameBuffer = (char *)
2677 		ShmemInitStruct("Backend Client Host Name Buffer", size, &found);
2678 
2679 	if (!found)
2680 	{
2681 		MemSet(BackendClientHostnameBuffer, 0, size);
2682 
2683 		/* Initialize st_clienthostname pointers. */
2684 		buffer = BackendClientHostnameBuffer;
2685 		for (i = 0; i < NumBackendStatSlots; i++)
2686 		{
2687 			BackendStatusArray[i].st_clienthostname = buffer;
2688 			buffer += NAMEDATALEN;
2689 		}
2690 	}
2691 
2692 	/* Create or attach to the shared activity buffer */
2693 	BackendActivityBufferSize = mul_size(pgstat_track_activity_query_size,
2694 										 NumBackendStatSlots);
2695 	BackendActivityBuffer = (char *)
2696 		ShmemInitStruct("Backend Activity Buffer",
2697 						BackendActivityBufferSize,
2698 						&found);
2699 
2700 	if (!found)
2701 	{
2702 		MemSet(BackendActivityBuffer, 0, BackendActivityBufferSize);
2703 
2704 		/* Initialize st_activity pointers. */
2705 		buffer = BackendActivityBuffer;
2706 		for (i = 0; i < NumBackendStatSlots; i++)
2707 		{
2708 			BackendStatusArray[i].st_activity = buffer;
2709 			buffer += pgstat_track_activity_query_size;
2710 		}
2711 	}
2712 
2713 #ifdef USE_SSL
2714 	/* Create or attach to the shared SSL status buffer */
2715 	size = mul_size(sizeof(PgBackendSSLStatus), NumBackendStatSlots);
2716 	BackendSslStatusBuffer = (PgBackendSSLStatus *)
2717 		ShmemInitStruct("Backend SSL Status Buffer", size, &found);
2718 
2719 	if (!found)
2720 	{
2721 		PgBackendSSLStatus *ptr;
2722 
2723 		MemSet(BackendSslStatusBuffer, 0, size);
2724 
2725 		/* Initialize st_sslstatus pointers. */
2726 		ptr = BackendSslStatusBuffer;
2727 		for (i = 0; i < NumBackendStatSlots; i++)
2728 		{
2729 			BackendStatusArray[i].st_sslstatus = ptr;
2730 			ptr++;
2731 		}
2732 	}
2733 #endif
2734 }
2735 
2736 
2737 /* ----------
2738  * pgstat_initialize() -
2739  *
2740  *	Initialize pgstats state, and set up our on-proc-exit hook.
2741  *	Called from InitPostgres and AuxiliaryProcessMain. For auxiliary process,
2742  *	MyBackendId is invalid. Otherwise, MyBackendId must be set,
2743  *	but we must not have started any transaction yet (since the
2744  *	exit hook must run after the last transaction exit).
2745  *	NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2746  * ----------
2747  */
2748 void
pgstat_initialize(void)2749 pgstat_initialize(void)
2750 {
2751 	/* Initialize MyBEEntry */
2752 	if (MyBackendId != InvalidBackendId)
2753 	{
2754 		Assert(MyBackendId >= 1 && MyBackendId <= MaxBackends);
2755 		MyBEEntry = &BackendStatusArray[MyBackendId - 1];
2756 	}
2757 	else
2758 	{
2759 		/* Must be an auxiliary process */
2760 		Assert(MyAuxProcType != NotAnAuxProcess);
2761 
2762 		/*
2763 		 * Assign the MyBEEntry for an auxiliary process.  Since it doesn't
2764 		 * have a BackendId, the slot is statically allocated based on the
2765 		 * auxiliary process type (MyAuxProcType).  Backends use slots indexed
2766 		 * in the range from 1 to MaxBackends (inclusive), so we use
2767 		 * MaxBackends + AuxBackendType + 1 as the index of the slot for an
2768 		 * auxiliary process.
2769 		 */
2770 		MyBEEntry = &BackendStatusArray[MaxBackends + MyAuxProcType];
2771 	}
2772 
2773 	/* Set up a process-exit hook to clean up */
2774 	on_shmem_exit(pgstat_beshutdown_hook, 0);
2775 }
2776 
2777 /* ----------
2778  * pgstat_bestart() -
2779  *
2780  *	Initialize this backend's entry in the PgBackendStatus array.
2781  *	Called from InitPostgres.
2782  *
2783  *	Apart from auxiliary processes, MyBackendId, MyDatabaseId,
2784  *	session userid, and application_name must be set for a
2785  *	backend (hence, this cannot be combined with pgstat_initialize).
2786  *	Note also that we must be inside a transaction if this isn't an aux
2787  *	process, as we may need to do encoding conversion on some strings.
2788  * ----------
2789  */
2790 void
pgstat_bestart(void)2791 pgstat_bestart(void)
2792 {
2793 	volatile PgBackendStatus *vbeentry = MyBEEntry;
2794 	PgBackendStatus lbeentry;
2795 #ifdef USE_SSL
2796 	PgBackendSSLStatus lsslstatus;
2797 #endif
2798 
2799 	/* pgstats state must be initialized from pgstat_initialize() */
2800 	Assert(vbeentry != NULL);
2801 
2802 	/*
2803 	 * To minimize the time spent modifying the PgBackendStatus entry, and
2804 	 * avoid risk of errors inside the critical section, we first copy the
2805 	 * shared-memory struct to a local variable, then modify the data in the
2806 	 * local variable, then copy the local variable back to shared memory.
2807 	 * Only the last step has to be inside the critical section.
2808 	 *
2809 	 * Most of the data we copy from shared memory is just going to be
2810 	 * overwritten, but the struct's not so large that it's worth the
2811 	 * maintenance hassle to copy only the needful fields.
2812 	 */
2813 	memcpy(&lbeentry,
2814 		   (char *) vbeentry,
2815 		   sizeof(PgBackendStatus));
2816 
2817 	/* This struct can just start from zeroes each time, though */
2818 #ifdef USE_SSL
2819 	memset(&lsslstatus, 0, sizeof(lsslstatus));
2820 #endif
2821 
2822 	/*
2823 	 * Now fill in all the fields of lbeentry, except for strings that are
2824 	 * out-of-line data.  Those have to be handled separately, below.
2825 	 */
2826 	lbeentry.st_procpid = MyProcPid;
2827 
2828 	if (MyBackendId != InvalidBackendId)
2829 	{
2830 		if (IsAutoVacuumLauncherProcess())
2831 		{
2832 			/* Autovacuum Launcher */
2833 			lbeentry.st_backendType = B_AUTOVAC_LAUNCHER;
2834 		}
2835 		else if (IsAutoVacuumWorkerProcess())
2836 		{
2837 			/* Autovacuum Worker */
2838 			lbeentry.st_backendType = B_AUTOVAC_WORKER;
2839 		}
2840 		else if (am_walsender)
2841 		{
2842 			/* Wal sender */
2843 			lbeentry.st_backendType = B_WAL_SENDER;
2844 		}
2845 		else if (IsBackgroundWorker)
2846 		{
2847 			/* bgworker */
2848 			lbeentry.st_backendType = B_BG_WORKER;
2849 		}
2850 		else
2851 		{
2852 			/* client-backend */
2853 			lbeentry.st_backendType = B_BACKEND;
2854 		}
2855 	}
2856 	else
2857 	{
2858 		/* Must be an auxiliary process */
2859 		Assert(MyAuxProcType != NotAnAuxProcess);
2860 		switch (MyAuxProcType)
2861 		{
2862 			case StartupProcess:
2863 				lbeentry.st_backendType = B_STARTUP;
2864 				break;
2865 			case BgWriterProcess:
2866 				lbeentry.st_backendType = B_BG_WRITER;
2867 				break;
2868 			case CheckpointerProcess:
2869 				lbeentry.st_backendType = B_CHECKPOINTER;
2870 				break;
2871 			case WalWriterProcess:
2872 				lbeentry.st_backendType = B_WAL_WRITER;
2873 				break;
2874 			case WalReceiverProcess:
2875 				lbeentry.st_backendType = B_WAL_RECEIVER;
2876 				break;
2877 			default:
2878 				elog(FATAL, "unrecognized process type: %d",
2879 					 (int) MyAuxProcType);
2880 		}
2881 	}
2882 
2883 	/*
2884 	 * If we have a MyProcPort, use its session start time (for consistency,
2885 	 * and to save a kernel call).
2886 	 */
2887 	if (MyProcPort)
2888 		lbeentry.st_proc_start_timestamp = MyProcPort->SessionStartTime;
2889 	else
2890 		lbeentry.st_proc_start_timestamp = GetCurrentTimestamp();
2891 
2892 	lbeentry.st_activity_start_timestamp = 0;
2893 	lbeentry.st_state_start_timestamp = 0;
2894 	lbeentry.st_xact_start_timestamp = 0;
2895 	lbeentry.st_databaseid = MyDatabaseId;
2896 
2897 	/* We have userid for client-backends, wal-sender and bgworker processes */
2898 	if (lbeentry.st_backendType == B_BACKEND
2899 		|| lbeentry.st_backendType == B_WAL_SENDER
2900 		|| lbeentry.st_backendType == B_BG_WORKER)
2901 		lbeentry.st_userid = GetSessionUserId();
2902 	else
2903 		lbeentry.st_userid = InvalidOid;
2904 
2905 	/*
2906 	 * We may not have a MyProcPort (eg, if this is the autovacuum process).
2907 	 * If so, use all-zeroes client address, which is dealt with specially in
2908 	 * pg_stat_get_backend_client_addr and pg_stat_get_backend_client_port.
2909 	 */
2910 	if (MyProcPort)
2911 		memcpy(&lbeentry.st_clientaddr, &MyProcPort->raddr,
2912 			   sizeof(lbeentry.st_clientaddr));
2913 	else
2914 		MemSet(&lbeentry.st_clientaddr, 0, sizeof(lbeentry.st_clientaddr));
2915 
2916 #ifdef USE_SSL
2917 	if (MyProcPort && MyProcPort->ssl != NULL)
2918 	{
2919 		lbeentry.st_ssl = true;
2920 		lsslstatus.ssl_bits = be_tls_get_cipher_bits(MyProcPort);
2921 		lsslstatus.ssl_compression = be_tls_get_compression(MyProcPort);
2922 		be_tls_get_version(MyProcPort, lsslstatus.ssl_version, NAMEDATALEN);
2923 		be_tls_get_cipher(MyProcPort, lsslstatus.ssl_cipher, NAMEDATALEN);
2924 		be_tls_get_peerdn_name(MyProcPort, lsslstatus.ssl_clientdn, NAMEDATALEN);
2925 	}
2926 	else
2927 	{
2928 		lbeentry.st_ssl = false;
2929 	}
2930 #else
2931 	lbeentry.st_ssl = false;
2932 #endif
2933 
2934 	lbeentry.st_state = STATE_UNDEFINED;
2935 	lbeentry.st_progress_command = PROGRESS_COMMAND_INVALID;
2936 	lbeentry.st_progress_command_target = InvalidOid;
2937 
2938 	/*
2939 	 * we don't zero st_progress_param here to save cycles; nobody should
2940 	 * examine it until st_progress_command has been set to something other
2941 	 * than PROGRESS_COMMAND_INVALID
2942 	 */
2943 
2944 	/*
2945 	 * We're ready to enter the critical section that fills the shared-memory
2946 	 * status entry.  We follow the protocol of bumping st_changecount before
2947 	 * and after; and make sure it's even afterwards.  We use a volatile
2948 	 * pointer here to ensure the compiler doesn't try to get cute.
2949 	 */
2950 	PGSTAT_BEGIN_WRITE_ACTIVITY(vbeentry);
2951 
2952 	/* make sure we'll memcpy the same st_changecount back */
2953 	lbeentry.st_changecount = vbeentry->st_changecount;
2954 
2955 	memcpy((char *) vbeentry,
2956 		   &lbeentry,
2957 		   sizeof(PgBackendStatus));
2958 
2959 	/*
2960 	 * We can write the out-of-line strings and structs using the pointers
2961 	 * that are in lbeentry; this saves some de-volatilizing messiness.
2962 	 */
2963 	lbeentry.st_appname[0] = '\0';
2964 	if (MyProcPort && MyProcPort->remote_hostname)
2965 		strlcpy(lbeentry.st_clienthostname, MyProcPort->remote_hostname,
2966 				NAMEDATALEN);
2967 	else
2968 		lbeentry.st_clienthostname[0] = '\0';
2969 	lbeentry.st_activity[0] = '\0';
2970 	/* Also make sure the last byte in each string area is always 0 */
2971 	lbeentry.st_appname[NAMEDATALEN - 1] = '\0';
2972 	lbeentry.st_clienthostname[NAMEDATALEN - 1] = '\0';
2973 	lbeentry.st_activity[pgstat_track_activity_query_size - 1] = '\0';
2974 
2975 #ifdef USE_SSL
2976 	memcpy(lbeentry.st_sslstatus, &lsslstatus, sizeof(PgBackendSSLStatus));
2977 #endif
2978 
2979 	PGSTAT_END_WRITE_ACTIVITY(vbeentry);
2980 
2981 	/* Update app name to current GUC setting */
2982 	if (application_name)
2983 		pgstat_report_appname(application_name);
2984 }
2985 
2986 /*
2987  * Shut down a single backend's statistics reporting at process exit.
2988  *
2989  * Flush any remaining statistics counts out to the collector.
2990  * Without this, operations triggered during backend exit (such as
2991  * temp table deletions) won't be counted.
2992  *
2993  * Lastly, clear out our entry in the PgBackendStatus array.
2994  */
2995 static void
pgstat_beshutdown_hook(int code,Datum arg)2996 pgstat_beshutdown_hook(int code, Datum arg)
2997 {
2998 	volatile PgBackendStatus *beentry = MyBEEntry;
2999 
3000 	/*
3001 	 * If we got as far as discovering our own database ID, we can report what
3002 	 * we did to the collector.  Otherwise, we'd be sending an invalid
3003 	 * database ID, so forget it.  (This means that accesses to pg_database
3004 	 * during failed backend starts might never get counted.)
3005 	 */
3006 	if (OidIsValid(MyDatabaseId))
3007 		pgstat_report_stat(true);
3008 
3009 	/*
3010 	 * Clear my status entry, following the protocol of bumping st_changecount
3011 	 * before and after.  We use a volatile pointer here to ensure the
3012 	 * compiler doesn't try to get cute.
3013 	 */
3014 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3015 
3016 	beentry->st_procpid = 0;	/* mark invalid */
3017 
3018 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3019 }
3020 
3021 
3022 /* ----------
3023  * pgstat_report_activity() -
3024  *
3025  *	Called from tcop/postgres.c to report what the backend is actually doing
3026  *	(but note cmd_str can be NULL for certain cases).
3027  *
3028  * All updates of the status entry follow the protocol of bumping
3029  * st_changecount before and after.  We use a volatile pointer here to
3030  * ensure the compiler doesn't try to get cute.
3031  * ----------
3032  */
3033 void
pgstat_report_activity(BackendState state,const char * cmd_str)3034 pgstat_report_activity(BackendState state, const char *cmd_str)
3035 {
3036 	volatile PgBackendStatus *beentry = MyBEEntry;
3037 	TimestampTz start_timestamp;
3038 	TimestampTz current_timestamp;
3039 	int			len = 0;
3040 
3041 	TRACE_POSTGRESQL_STATEMENT_STATUS(cmd_str);
3042 
3043 	if (!beentry)
3044 		return;
3045 
3046 	if (!pgstat_track_activities)
3047 	{
3048 		if (beentry->st_state != STATE_DISABLED)
3049 		{
3050 			volatile PGPROC *proc = MyProc;
3051 
3052 			/*
3053 			 * track_activities is disabled, but we last reported a
3054 			 * non-disabled state.  As our final update, change the state and
3055 			 * clear fields we will not be updating anymore.
3056 			 */
3057 			PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3058 			beentry->st_state = STATE_DISABLED;
3059 			beentry->st_state_start_timestamp = 0;
3060 			beentry->st_activity[0] = '\0';
3061 			beentry->st_activity_start_timestamp = 0;
3062 			/* st_xact_start_timestamp and wait_event_info are also disabled */
3063 			beentry->st_xact_start_timestamp = 0;
3064 			proc->wait_event_info = 0;
3065 			PGSTAT_END_WRITE_ACTIVITY(beentry);
3066 		}
3067 		return;
3068 	}
3069 
3070 	/*
3071 	 * To minimize the time spent modifying the entry, and avoid risk of
3072 	 * errors inside the critical section, fetch all the needed data first.
3073 	 */
3074 	start_timestamp = GetCurrentStatementStartTimestamp();
3075 	if (cmd_str != NULL)
3076 	{
3077 		len = pg_mbcliplen(cmd_str, strlen(cmd_str),
3078 						   pgstat_track_activity_query_size - 1);
3079 	}
3080 	current_timestamp = GetCurrentTimestamp();
3081 
3082 	/*
3083 	 * Now update the status entry
3084 	 */
3085 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3086 
3087 	beentry->st_state = state;
3088 	beentry->st_state_start_timestamp = current_timestamp;
3089 
3090 	if (cmd_str != NULL)
3091 	{
3092 		memcpy((char *) beentry->st_activity, cmd_str, len);
3093 		beentry->st_activity[len] = '\0';
3094 		beentry->st_activity_start_timestamp = start_timestamp;
3095 	}
3096 
3097 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3098 }
3099 
3100 /*-----------
3101  * pgstat_progress_start_command() -
3102  *
3103  * Set st_progress_command (and st_progress_command_target) in own backend
3104  * entry.  Also, zero-initialize st_progress_param array.
3105  *-----------
3106  */
3107 void
pgstat_progress_start_command(ProgressCommandType cmdtype,Oid relid)3108 pgstat_progress_start_command(ProgressCommandType cmdtype, Oid relid)
3109 {
3110 	volatile PgBackendStatus *beentry = MyBEEntry;
3111 
3112 	if (!beentry || !pgstat_track_activities)
3113 		return;
3114 
3115 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3116 	beentry->st_progress_command = cmdtype;
3117 	beentry->st_progress_command_target = relid;
3118 	MemSet(&beentry->st_progress_param, 0, sizeof(beentry->st_progress_param));
3119 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3120 }
3121 
3122 /*-----------
3123  * pgstat_progress_update_param() -
3124  *
3125  * Update index'th member in st_progress_param[] of own backend entry.
3126  *-----------
3127  */
3128 void
pgstat_progress_update_param(int index,int64 val)3129 pgstat_progress_update_param(int index, int64 val)
3130 {
3131 	volatile PgBackendStatus *beentry = MyBEEntry;
3132 
3133 	Assert(index >= 0 && index < PGSTAT_NUM_PROGRESS_PARAM);
3134 
3135 	if (!beentry || !pgstat_track_activities)
3136 		return;
3137 
3138 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3139 	beentry->st_progress_param[index] = val;
3140 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3141 }
3142 
3143 /*-----------
3144  * pgstat_progress_update_multi_param() -
3145  *
3146  * Update multiple members in st_progress_param[] of own backend entry.
3147  * This is atomic; readers won't see intermediate states.
3148  *-----------
3149  */
3150 void
pgstat_progress_update_multi_param(int nparam,const int * index,const int64 * val)3151 pgstat_progress_update_multi_param(int nparam, const int *index,
3152 								   const int64 *val)
3153 {
3154 	volatile PgBackendStatus *beentry = MyBEEntry;
3155 	int			i;
3156 
3157 	if (!beentry || !pgstat_track_activities || nparam == 0)
3158 		return;
3159 
3160 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3161 
3162 	for (i = 0; i < nparam; ++i)
3163 	{
3164 		Assert(index[i] >= 0 && index[i] < PGSTAT_NUM_PROGRESS_PARAM);
3165 
3166 		beentry->st_progress_param[index[i]] = val[i];
3167 	}
3168 
3169 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3170 }
3171 
3172 /*-----------
3173  * pgstat_progress_end_command() -
3174  *
3175  * Reset st_progress_command (and st_progress_command_target) in own backend
3176  * entry.  This signals the end of the command.
3177  *-----------
3178  */
3179 void
pgstat_progress_end_command(void)3180 pgstat_progress_end_command(void)
3181 {
3182 	volatile PgBackendStatus *beentry = MyBEEntry;
3183 
3184 	if (!beentry || !pgstat_track_activities)
3185 		return;
3186 
3187 	if (beentry->st_progress_command == PROGRESS_COMMAND_INVALID)
3188 		return;
3189 
3190 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3191 	beentry->st_progress_command = PROGRESS_COMMAND_INVALID;
3192 	beentry->st_progress_command_target = InvalidOid;
3193 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3194 }
3195 
3196 /* ----------
3197  * pgstat_report_appname() -
3198  *
3199  *	Called to update our application name.
3200  * ----------
3201  */
3202 void
pgstat_report_appname(const char * appname)3203 pgstat_report_appname(const char *appname)
3204 {
3205 	volatile PgBackendStatus *beentry = MyBEEntry;
3206 	int			len;
3207 
3208 	if (!beentry)
3209 		return;
3210 
3211 	/* This should be unnecessary if GUC did its job, but be safe */
3212 	len = pg_mbcliplen(appname, strlen(appname), NAMEDATALEN - 1);
3213 
3214 	/*
3215 	 * Update my status entry, following the protocol of bumping
3216 	 * st_changecount before and after.  We use a volatile pointer here to
3217 	 * ensure the compiler doesn't try to get cute.
3218 	 */
3219 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3220 
3221 	memcpy((char *) beentry->st_appname, appname, len);
3222 	beentry->st_appname[len] = '\0';
3223 
3224 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3225 }
3226 
3227 /*
3228  * Report current transaction start timestamp as the specified value.
3229  * Zero means there is no active transaction.
3230  */
3231 void
pgstat_report_xact_timestamp(TimestampTz tstamp)3232 pgstat_report_xact_timestamp(TimestampTz tstamp)
3233 {
3234 	volatile PgBackendStatus *beentry = MyBEEntry;
3235 
3236 	if (!pgstat_track_activities || !beentry)
3237 		return;
3238 
3239 	/*
3240 	 * Update my status entry, following the protocol of bumping
3241 	 * st_changecount before and after.  We use a volatile pointer here to
3242 	 * ensure the compiler doesn't try to get cute.
3243 	 */
3244 	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
3245 
3246 	beentry->st_xact_start_timestamp = tstamp;
3247 
3248 	PGSTAT_END_WRITE_ACTIVITY(beentry);
3249 }
3250 
3251 /* ----------
3252  * pgstat_read_current_status() -
3253  *
3254  *	Copy the current contents of the PgBackendStatus array to local memory,
3255  *	if not already done in this transaction.
3256  * ----------
3257  */
3258 static void
pgstat_read_current_status(void)3259 pgstat_read_current_status(void)
3260 {
3261 	volatile PgBackendStatus *beentry;
3262 	LocalPgBackendStatus *localtable;
3263 	LocalPgBackendStatus *localentry;
3264 	char	   *localappname,
3265 			   *localclienthostname,
3266 			   *localactivity;
3267 #ifdef USE_SSL
3268 	PgBackendSSLStatus *localsslstatus;
3269 #endif
3270 	int			i;
3271 
3272 	Assert(!pgStatRunningInCollector);
3273 	if (localBackendStatusTable)
3274 		return;					/* already done */
3275 
3276 	pgstat_setup_memcxt();
3277 
3278 	localtable = (LocalPgBackendStatus *)
3279 		MemoryContextAlloc(pgStatLocalContext,
3280 						   sizeof(LocalPgBackendStatus) * NumBackendStatSlots);
3281 	localappname = (char *)
3282 		MemoryContextAlloc(pgStatLocalContext,
3283 						   NAMEDATALEN * NumBackendStatSlots);
3284 	localclienthostname = (char *)
3285 		MemoryContextAlloc(pgStatLocalContext,
3286 						   NAMEDATALEN * NumBackendStatSlots);
3287 	localactivity = (char *)
3288 		MemoryContextAlloc(pgStatLocalContext,
3289 						   pgstat_track_activity_query_size * NumBackendStatSlots);
3290 #ifdef USE_SSL
3291 	localsslstatus = (PgBackendSSLStatus *)
3292 		MemoryContextAlloc(pgStatLocalContext,
3293 						   sizeof(PgBackendSSLStatus) * NumBackendStatSlots);
3294 #endif
3295 
3296 	localNumBackends = 0;
3297 
3298 	beentry = BackendStatusArray;
3299 	localentry = localtable;
3300 	for (i = 1; i <= NumBackendStatSlots; i++)
3301 	{
3302 		/*
3303 		 * Follow the protocol of retrying if st_changecount changes while we
3304 		 * copy the entry, or if it's odd.  (The check for odd is needed to
3305 		 * cover the case where we are able to completely copy the entry while
3306 		 * the source backend is between increment steps.)	We use a volatile
3307 		 * pointer here to ensure the compiler doesn't try to get cute.
3308 		 */
3309 		for (;;)
3310 		{
3311 			int			before_changecount;
3312 			int			after_changecount;
3313 
3314 			pgstat_begin_read_activity(beentry, before_changecount);
3315 
3316 			localentry->backendStatus.st_procpid = beentry->st_procpid;
3317 			/* Skip all the data-copying work if entry is not in use */
3318 			if (localentry->backendStatus.st_procpid > 0)
3319 			{
3320 				memcpy(&localentry->backendStatus, (char *) beentry, sizeof(PgBackendStatus));
3321 
3322 				/*
3323 				 * For each PgBackendStatus field that is a pointer, copy the
3324 				 * pointed-to data, then adjust the local copy of the pointer
3325 				 * field to point at the local copy of the data.
3326 				 *
3327 				 * strcpy is safe even if the string is modified concurrently,
3328 				 * because there's always a \0 at the end of the buffer.
3329 				 */
3330 				strcpy(localappname, (char *) beentry->st_appname);
3331 				localentry->backendStatus.st_appname = localappname;
3332 				strcpy(localclienthostname, (char *) beentry->st_clienthostname);
3333 				localentry->backendStatus.st_clienthostname = localclienthostname;
3334 				strcpy(localactivity, (char *) beentry->st_activity);
3335 				localentry->backendStatus.st_activity = localactivity;
3336 #ifdef USE_SSL
3337 				if (beentry->st_ssl)
3338 				{
3339 					memcpy(localsslstatus, beentry->st_sslstatus, sizeof(PgBackendSSLStatus));
3340 					localentry->backendStatus.st_sslstatus = localsslstatus;
3341 				}
3342 #endif
3343 			}
3344 
3345 			pgstat_end_read_activity(beentry, after_changecount);
3346 
3347 			if (pgstat_read_activity_complete(before_changecount,
3348 											  after_changecount))
3349 				break;
3350 
3351 			/* Make sure we can break out of loop if stuck... */
3352 			CHECK_FOR_INTERRUPTS();
3353 		}
3354 
3355 		beentry++;
3356 		/* Only valid entries get included into the local array */
3357 		if (localentry->backendStatus.st_procpid > 0)
3358 		{
3359 			BackendIdGetTransactionIds(i,
3360 									   &localentry->backend_xid,
3361 									   &localentry->backend_xmin);
3362 
3363 			localentry++;
3364 			localappname += NAMEDATALEN;
3365 			localclienthostname += NAMEDATALEN;
3366 			localactivity += pgstat_track_activity_query_size;
3367 #ifdef USE_SSL
3368 			localsslstatus++;
3369 #endif
3370 			localNumBackends++;
3371 		}
3372 	}
3373 
3374 	/* Set the pointer only after completion of a valid table */
3375 	localBackendStatusTable = localtable;
3376 }
3377 
3378 /* ----------
3379  * pgstat_get_wait_event_type() -
3380  *
3381  *	Return a string representing the current wait event type, backend is
3382  *	waiting on.
3383  */
3384 const char *
pgstat_get_wait_event_type(uint32 wait_event_info)3385 pgstat_get_wait_event_type(uint32 wait_event_info)
3386 {
3387 	uint32		classId;
3388 	const char *event_type;
3389 
3390 	/* report process as not waiting. */
3391 	if (wait_event_info == 0)
3392 		return NULL;
3393 
3394 	classId = wait_event_info & 0xFF000000;
3395 
3396 	switch (classId)
3397 	{
3398 		case PG_WAIT_LWLOCK:
3399 			event_type = "LWLock";
3400 			break;
3401 		case PG_WAIT_LOCK:
3402 			event_type = "Lock";
3403 			break;
3404 		case PG_WAIT_BUFFER_PIN:
3405 			event_type = "BufferPin";
3406 			break;
3407 		case PG_WAIT_ACTIVITY:
3408 			event_type = "Activity";
3409 			break;
3410 		case PG_WAIT_CLIENT:
3411 			event_type = "Client";
3412 			break;
3413 		case PG_WAIT_EXTENSION:
3414 			event_type = "Extension";
3415 			break;
3416 		case PG_WAIT_IPC:
3417 			event_type = "IPC";
3418 			break;
3419 		case PG_WAIT_TIMEOUT:
3420 			event_type = "Timeout";
3421 			break;
3422 		case PG_WAIT_IO:
3423 			event_type = "IO";
3424 			break;
3425 		default:
3426 			event_type = "???";
3427 			break;
3428 	}
3429 
3430 	return event_type;
3431 }
3432 
3433 /* ----------
3434  * pgstat_get_wait_event() -
3435  *
3436  *	Return a string representing the current wait event, backend is
3437  *	waiting on.
3438  */
3439 const char *
pgstat_get_wait_event(uint32 wait_event_info)3440 pgstat_get_wait_event(uint32 wait_event_info)
3441 {
3442 	uint32		classId;
3443 	uint16		eventId;
3444 	const char *event_name;
3445 
3446 	/* report process as not waiting. */
3447 	if (wait_event_info == 0)
3448 		return NULL;
3449 
3450 	classId = wait_event_info & 0xFF000000;
3451 	eventId = wait_event_info & 0x0000FFFF;
3452 
3453 	switch (classId)
3454 	{
3455 		case PG_WAIT_LWLOCK:
3456 			event_name = GetLWLockIdentifier(classId, eventId);
3457 			break;
3458 		case PG_WAIT_LOCK:
3459 			event_name = GetLockNameFromTagType(eventId);
3460 			break;
3461 		case PG_WAIT_BUFFER_PIN:
3462 			event_name = "BufferPin";
3463 			break;
3464 		case PG_WAIT_ACTIVITY:
3465 			{
3466 				WaitEventActivity w = (WaitEventActivity) wait_event_info;
3467 
3468 				event_name = pgstat_get_wait_activity(w);
3469 				break;
3470 			}
3471 		case PG_WAIT_CLIENT:
3472 			{
3473 				WaitEventClient w = (WaitEventClient) wait_event_info;
3474 
3475 				event_name = pgstat_get_wait_client(w);
3476 				break;
3477 			}
3478 		case PG_WAIT_EXTENSION:
3479 			event_name = "Extension";
3480 			break;
3481 		case PG_WAIT_IPC:
3482 			{
3483 				WaitEventIPC w = (WaitEventIPC) wait_event_info;
3484 
3485 				event_name = pgstat_get_wait_ipc(w);
3486 				break;
3487 			}
3488 		case PG_WAIT_TIMEOUT:
3489 			{
3490 				WaitEventTimeout w = (WaitEventTimeout) wait_event_info;
3491 
3492 				event_name = pgstat_get_wait_timeout(w);
3493 				break;
3494 			}
3495 		case PG_WAIT_IO:
3496 			{
3497 				WaitEventIO w = (WaitEventIO) wait_event_info;
3498 
3499 				event_name = pgstat_get_wait_io(w);
3500 				break;
3501 			}
3502 		default:
3503 			event_name = "unknown wait event";
3504 			break;
3505 	}
3506 
3507 	return event_name;
3508 }
3509 
3510 /* ----------
3511  * pgstat_get_wait_activity() -
3512  *
3513  * Convert WaitEventActivity to string.
3514  * ----------
3515  */
3516 static const char *
pgstat_get_wait_activity(WaitEventActivity w)3517 pgstat_get_wait_activity(WaitEventActivity w)
3518 {
3519 	const char *event_name = "unknown wait event";
3520 
3521 	switch (w)
3522 	{
3523 		case WAIT_EVENT_ARCHIVER_MAIN:
3524 			event_name = "ArchiverMain";
3525 			break;
3526 		case WAIT_EVENT_AUTOVACUUM_MAIN:
3527 			event_name = "AutoVacuumMain";
3528 			break;
3529 		case WAIT_EVENT_BGWRITER_HIBERNATE:
3530 			event_name = "BgWriterHibernate";
3531 			break;
3532 		case WAIT_EVENT_BGWRITER_MAIN:
3533 			event_name = "BgWriterMain";
3534 			break;
3535 		case WAIT_EVENT_CHECKPOINTER_MAIN:
3536 			event_name = "CheckpointerMain";
3537 			break;
3538 		case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
3539 			event_name = "LogicalLauncherMain";
3540 			break;
3541 		case WAIT_EVENT_LOGICAL_APPLY_MAIN:
3542 			event_name = "LogicalApplyMain";
3543 			break;
3544 		case WAIT_EVENT_PGSTAT_MAIN:
3545 			event_name = "PgStatMain";
3546 			break;
3547 		case WAIT_EVENT_RECOVERY_WAL_ALL:
3548 			event_name = "RecoveryWalAll";
3549 			break;
3550 		case WAIT_EVENT_RECOVERY_WAL_STREAM:
3551 			event_name = "RecoveryWalStream";
3552 			break;
3553 		case WAIT_EVENT_SYSLOGGER_MAIN:
3554 			event_name = "SysLoggerMain";
3555 			break;
3556 		case WAIT_EVENT_WAL_RECEIVER_MAIN:
3557 			event_name = "WalReceiverMain";
3558 			break;
3559 		case WAIT_EVENT_WAL_SENDER_MAIN:
3560 			event_name = "WalSenderMain";
3561 			break;
3562 		case WAIT_EVENT_WAL_WRITER_MAIN:
3563 			event_name = "WalWriterMain";
3564 			break;
3565 			/* no default case, so that compiler will warn */
3566 	}
3567 
3568 	return event_name;
3569 }
3570 
3571 /* ----------
3572  * pgstat_get_wait_client() -
3573  *
3574  * Convert WaitEventClient to string.
3575  * ----------
3576  */
3577 static const char *
pgstat_get_wait_client(WaitEventClient w)3578 pgstat_get_wait_client(WaitEventClient w)
3579 {
3580 	const char *event_name = "unknown wait event";
3581 
3582 	switch (w)
3583 	{
3584 		case WAIT_EVENT_CLIENT_READ:
3585 			event_name = "ClientRead";
3586 			break;
3587 		case WAIT_EVENT_CLIENT_WRITE:
3588 			event_name = "ClientWrite";
3589 			break;
3590 		case WAIT_EVENT_LIBPQWALRECEIVER_CONNECT:
3591 			event_name = "LibPQWalReceiverConnect";
3592 			break;
3593 		case WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE:
3594 			event_name = "LibPQWalReceiverReceive";
3595 			break;
3596 		case WAIT_EVENT_SSL_OPEN_SERVER:
3597 			event_name = "SSLOpenServer";
3598 			break;
3599 		case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
3600 			event_name = "WalReceiverWaitStart";
3601 			break;
3602 		case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
3603 			event_name = "WalSenderWaitForWAL";
3604 			break;
3605 		case WAIT_EVENT_WAL_SENDER_WRITE_DATA:
3606 			event_name = "WalSenderWriteData";
3607 			break;
3608 			/* no default case, so that compiler will warn */
3609 	}
3610 
3611 	return event_name;
3612 }
3613 
3614 /* ----------
3615  * pgstat_get_wait_ipc() -
3616  *
3617  * Convert WaitEventIPC to string.
3618  * ----------
3619  */
3620 static const char *
pgstat_get_wait_ipc(WaitEventIPC w)3621 pgstat_get_wait_ipc(WaitEventIPC w)
3622 {
3623 	const char *event_name = "unknown wait event";
3624 
3625 	switch (w)
3626 	{
3627 		case WAIT_EVENT_BGWORKER_SHUTDOWN:
3628 			event_name = "BgWorkerShutdown";
3629 			break;
3630 		case WAIT_EVENT_BGWORKER_STARTUP:
3631 			event_name = "BgWorkerStartup";
3632 			break;
3633 		case WAIT_EVENT_BTREE_PAGE:
3634 			event_name = "BtreePage";
3635 			break;
3636 		case WAIT_EVENT_EXECUTE_GATHER:
3637 			event_name = "ExecuteGather";
3638 			break;
3639 		case WAIT_EVENT_LOGICAL_SYNC_DATA:
3640 			event_name = "LogicalSyncData";
3641 			break;
3642 		case WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE:
3643 			event_name = "LogicalSyncStateChange";
3644 			break;
3645 		case WAIT_EVENT_MQ_INTERNAL:
3646 			event_name = "MessageQueueInternal";
3647 			break;
3648 		case WAIT_EVENT_MQ_PUT_MESSAGE:
3649 			event_name = "MessageQueuePutMessage";
3650 			break;
3651 		case WAIT_EVENT_MQ_RECEIVE:
3652 			event_name = "MessageQueueReceive";
3653 			break;
3654 		case WAIT_EVENT_MQ_SEND:
3655 			event_name = "MessageQueueSend";
3656 			break;
3657 		case WAIT_EVENT_PARALLEL_FINISH:
3658 			event_name = "ParallelFinish";
3659 			break;
3660 		case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
3661 			event_name = "ParallelBitmapScan";
3662 			break;
3663 		case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
3664 			event_name = "ProcArrayGroupUpdate";
3665 			break;
3666 		case WAIT_EVENT_REPLICATION_ORIGIN_DROP:
3667 			event_name = "ReplicationOriginDrop";
3668 			break;
3669 		case WAIT_EVENT_REPLICATION_SLOT_DROP:
3670 			event_name = "ReplicationSlotDrop";
3671 			break;
3672 		case WAIT_EVENT_SAFE_SNAPSHOT:
3673 			event_name = "SafeSnapshot";
3674 			break;
3675 		case WAIT_EVENT_SYNC_REP:
3676 			event_name = "SyncRep";
3677 			break;
3678 			/* no default case, so that compiler will warn */
3679 	}
3680 
3681 	return event_name;
3682 }
3683 
3684 /* ----------
3685  * pgstat_get_wait_timeout() -
3686  *
3687  * Convert WaitEventTimeout to string.
3688  * ----------
3689  */
3690 static const char *
pgstat_get_wait_timeout(WaitEventTimeout w)3691 pgstat_get_wait_timeout(WaitEventTimeout w)
3692 {
3693 	const char *event_name = "unknown wait event";
3694 
3695 	switch (w)
3696 	{
3697 		case WAIT_EVENT_BASE_BACKUP_THROTTLE:
3698 			event_name = "BaseBackupThrottle";
3699 			break;
3700 		case WAIT_EVENT_PG_SLEEP:
3701 			event_name = "PgSleep";
3702 			break;
3703 		case WAIT_EVENT_RECOVERY_APPLY_DELAY:
3704 			event_name = "RecoveryApplyDelay";
3705 			break;
3706 			/* no default case, so that compiler will warn */
3707 	}
3708 
3709 	return event_name;
3710 }
3711 
3712 /* ----------
3713  * pgstat_get_wait_io() -
3714  *
3715  * Convert WaitEventIO to string.
3716  * ----------
3717  */
3718 static const char *
pgstat_get_wait_io(WaitEventIO w)3719 pgstat_get_wait_io(WaitEventIO w)
3720 {
3721 	const char *event_name = "unknown wait event";
3722 
3723 	switch (w)
3724 	{
3725 		case WAIT_EVENT_BUFFILE_READ:
3726 			event_name = "BufFileRead";
3727 			break;
3728 		case WAIT_EVENT_BUFFILE_WRITE:
3729 			event_name = "BufFileWrite";
3730 			break;
3731 		case WAIT_EVENT_CONTROL_FILE_READ:
3732 			event_name = "ControlFileRead";
3733 			break;
3734 		case WAIT_EVENT_CONTROL_FILE_SYNC:
3735 			event_name = "ControlFileSync";
3736 			break;
3737 		case WAIT_EVENT_CONTROL_FILE_SYNC_UPDATE:
3738 			event_name = "ControlFileSyncUpdate";
3739 			break;
3740 		case WAIT_EVENT_CONTROL_FILE_WRITE:
3741 			event_name = "ControlFileWrite";
3742 			break;
3743 		case WAIT_EVENT_CONTROL_FILE_WRITE_UPDATE:
3744 			event_name = "ControlFileWriteUpdate";
3745 			break;
3746 		case WAIT_EVENT_COPY_FILE_READ:
3747 			event_name = "CopyFileRead";
3748 			break;
3749 		case WAIT_EVENT_COPY_FILE_WRITE:
3750 			event_name = "CopyFileWrite";
3751 			break;
3752 		case WAIT_EVENT_DATA_FILE_EXTEND:
3753 			event_name = "DataFileExtend";
3754 			break;
3755 		case WAIT_EVENT_DATA_FILE_FLUSH:
3756 			event_name = "DataFileFlush";
3757 			break;
3758 		case WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC:
3759 			event_name = "DataFileImmediateSync";
3760 			break;
3761 		case WAIT_EVENT_DATA_FILE_PREFETCH:
3762 			event_name = "DataFilePrefetch";
3763 			break;
3764 		case WAIT_EVENT_DATA_FILE_READ:
3765 			event_name = "DataFileRead";
3766 			break;
3767 		case WAIT_EVENT_DATA_FILE_SYNC:
3768 			event_name = "DataFileSync";
3769 			break;
3770 		case WAIT_EVENT_DATA_FILE_TRUNCATE:
3771 			event_name = "DataFileTruncate";
3772 			break;
3773 		case WAIT_EVENT_DATA_FILE_WRITE:
3774 			event_name = "DataFileWrite";
3775 			break;
3776 		case WAIT_EVENT_DSM_FILL_ZERO_WRITE:
3777 			event_name = "DSMFillZeroWrite";
3778 			break;
3779 		case WAIT_EVENT_LOCK_FILE_ADDTODATADIR_READ:
3780 			event_name = "LockFileAddToDataDirRead";
3781 			break;
3782 		case WAIT_EVENT_LOCK_FILE_ADDTODATADIR_SYNC:
3783 			event_name = "LockFileAddToDataDirSync";
3784 			break;
3785 		case WAIT_EVENT_LOCK_FILE_ADDTODATADIR_WRITE:
3786 			event_name = "LockFileAddToDataDirWrite";
3787 			break;
3788 		case WAIT_EVENT_LOCK_FILE_CREATE_READ:
3789 			event_name = "LockFileCreateRead";
3790 			break;
3791 		case WAIT_EVENT_LOCK_FILE_CREATE_SYNC:
3792 			event_name = "LockFileCreateSync";
3793 			break;
3794 		case WAIT_EVENT_LOCK_FILE_CREATE_WRITE:
3795 			event_name = "LockFileCreateWrite";
3796 			break;
3797 		case WAIT_EVENT_LOCK_FILE_RECHECKDATADIR_READ:
3798 			event_name = "LockFileReCheckDataDirRead";
3799 			break;
3800 		case WAIT_EVENT_LOGICAL_REWRITE_CHECKPOINT_SYNC:
3801 			event_name = "LogicalRewriteCheckpointSync";
3802 			break;
3803 		case WAIT_EVENT_LOGICAL_REWRITE_MAPPING_SYNC:
3804 			event_name = "LogicalRewriteMappingSync";
3805 			break;
3806 		case WAIT_EVENT_LOGICAL_REWRITE_MAPPING_WRITE:
3807 			event_name = "LogicalRewriteMappingWrite";
3808 			break;
3809 		case WAIT_EVENT_LOGICAL_REWRITE_SYNC:
3810 			event_name = "LogicalRewriteSync";
3811 			break;
3812 		case WAIT_EVENT_LOGICAL_REWRITE_TRUNCATE:
3813 			event_name = "LogicalRewriteTruncate";
3814 			break;
3815 		case WAIT_EVENT_LOGICAL_REWRITE_WRITE:
3816 			event_name = "LogicalRewriteWrite";
3817 			break;
3818 		case WAIT_EVENT_RELATION_MAP_READ:
3819 			event_name = "RelationMapRead";
3820 			break;
3821 		case WAIT_EVENT_RELATION_MAP_SYNC:
3822 			event_name = "RelationMapSync";
3823 			break;
3824 		case WAIT_EVENT_RELATION_MAP_WRITE:
3825 			event_name = "RelationMapWrite";
3826 			break;
3827 		case WAIT_EVENT_REORDER_BUFFER_READ:
3828 			event_name = "ReorderBufferRead";
3829 			break;
3830 		case WAIT_EVENT_REORDER_BUFFER_WRITE:
3831 			event_name = "ReorderBufferWrite";
3832 			break;
3833 		case WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ:
3834 			event_name = "ReorderLogicalMappingRead";
3835 			break;
3836 		case WAIT_EVENT_REPLICATION_SLOT_READ:
3837 			event_name = "ReplicationSlotRead";
3838 			break;
3839 		case WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC:
3840 			event_name = "ReplicationSlotRestoreSync";
3841 			break;
3842 		case WAIT_EVENT_REPLICATION_SLOT_SYNC:
3843 			event_name = "ReplicationSlotSync";
3844 			break;
3845 		case WAIT_EVENT_REPLICATION_SLOT_WRITE:
3846 			event_name = "ReplicationSlotWrite";
3847 			break;
3848 		case WAIT_EVENT_SLRU_FLUSH_SYNC:
3849 			event_name = "SLRUFlushSync";
3850 			break;
3851 		case WAIT_EVENT_SLRU_READ:
3852 			event_name = "SLRURead";
3853 			break;
3854 		case WAIT_EVENT_SLRU_SYNC:
3855 			event_name = "SLRUSync";
3856 			break;
3857 		case WAIT_EVENT_SLRU_WRITE:
3858 			event_name = "SLRUWrite";
3859 			break;
3860 		case WAIT_EVENT_SNAPBUILD_READ:
3861 			event_name = "SnapbuildRead";
3862 			break;
3863 		case WAIT_EVENT_SNAPBUILD_SYNC:
3864 			event_name = "SnapbuildSync";
3865 			break;
3866 		case WAIT_EVENT_SNAPBUILD_WRITE:
3867 			event_name = "SnapbuildWrite";
3868 			break;
3869 		case WAIT_EVENT_TIMELINE_HISTORY_FILE_SYNC:
3870 			event_name = "TimelineHistoryFileSync";
3871 			break;
3872 		case WAIT_EVENT_TIMELINE_HISTORY_FILE_WRITE:
3873 			event_name = "TimelineHistoryFileWrite";
3874 			break;
3875 		case WAIT_EVENT_TIMELINE_HISTORY_READ:
3876 			event_name = "TimelineHistoryRead";
3877 			break;
3878 		case WAIT_EVENT_TIMELINE_HISTORY_SYNC:
3879 			event_name = "TimelineHistorySync";
3880 			break;
3881 		case WAIT_EVENT_TIMELINE_HISTORY_WRITE:
3882 			event_name = "TimelineHistoryWrite";
3883 			break;
3884 		case WAIT_EVENT_TWOPHASE_FILE_READ:
3885 			event_name = "TwophaseFileRead";
3886 			break;
3887 		case WAIT_EVENT_TWOPHASE_FILE_SYNC:
3888 			event_name = "TwophaseFileSync";
3889 			break;
3890 		case WAIT_EVENT_TWOPHASE_FILE_WRITE:
3891 			event_name = "TwophaseFileWrite";
3892 			break;
3893 		case WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ:
3894 			event_name = "WALSenderTimelineHistoryRead";
3895 			break;
3896 		case WAIT_EVENT_WAL_BOOTSTRAP_SYNC:
3897 			event_name = "WALBootstrapSync";
3898 			break;
3899 		case WAIT_EVENT_WAL_BOOTSTRAP_WRITE:
3900 			event_name = "WALBootstrapWrite";
3901 			break;
3902 		case WAIT_EVENT_WAL_COPY_READ:
3903 			event_name = "WALCopyRead";
3904 			break;
3905 		case WAIT_EVENT_WAL_COPY_SYNC:
3906 			event_name = "WALCopySync";
3907 			break;
3908 		case WAIT_EVENT_WAL_COPY_WRITE:
3909 			event_name = "WALCopyWrite";
3910 			break;
3911 		case WAIT_EVENT_WAL_INIT_SYNC:
3912 			event_name = "WALInitSync";
3913 			break;
3914 		case WAIT_EVENT_WAL_INIT_WRITE:
3915 			event_name = "WALInitWrite";
3916 			break;
3917 		case WAIT_EVENT_WAL_READ:
3918 			event_name = "WALRead";
3919 			break;
3920 		case WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN:
3921 			event_name = "WALSyncMethodAssign";
3922 			break;
3923 		case WAIT_EVENT_WAL_WRITE:
3924 			event_name = "WALWrite";
3925 			break;
3926 
3927 			/* no default case, so that compiler will warn */
3928 	}
3929 
3930 	return event_name;
3931 }
3932 
3933 
3934 /* ----------
3935  * pgstat_get_backend_current_activity() -
3936  *
3937  *	Return a string representing the current activity of the backend with
3938  *	the specified PID.  This looks directly at the BackendStatusArray,
3939  *	and so will provide current information regardless of the age of our
3940  *	transaction's snapshot of the status array.
3941  *
3942  *	It is the caller's responsibility to invoke this only for backends whose
3943  *	state is expected to remain stable while the result is in use.  The
3944  *	only current use is in deadlock reporting, where we can expect that
3945  *	the target backend is blocked on a lock.  (There are corner cases
3946  *	where the target's wait could get aborted while we are looking at it,
3947  *	but the very worst consequence is to return a pointer to a string
3948  *	that's been changed, so we won't worry too much.)
3949  *
3950  *	Note: return strings for special cases match pg_stat_get_backend_activity.
3951  * ----------
3952  */
3953 const char *
pgstat_get_backend_current_activity(int pid,bool checkUser)3954 pgstat_get_backend_current_activity(int pid, bool checkUser)
3955 {
3956 	PgBackendStatus *beentry;
3957 	int			i;
3958 
3959 	beentry = BackendStatusArray;
3960 	for (i = 1; i <= MaxBackends; i++)
3961 	{
3962 		/*
3963 		 * Although we expect the target backend's entry to be stable, that
3964 		 * doesn't imply that anyone else's is.  To avoid identifying the
3965 		 * wrong backend, while we check for a match to the desired PID we
3966 		 * must follow the protocol of retrying if st_changecount changes
3967 		 * while we examine the entry, or if it's odd.  (This might be
3968 		 * unnecessary, since fetching or storing an int is almost certainly
3969 		 * atomic, but let's play it safe.)  We use a volatile pointer here to
3970 		 * ensure the compiler doesn't try to get cute.
3971 		 */
3972 		volatile PgBackendStatus *vbeentry = beentry;
3973 		bool		found;
3974 
3975 		for (;;)
3976 		{
3977 			int			before_changecount;
3978 			int			after_changecount;
3979 
3980 			pgstat_begin_read_activity(vbeentry, before_changecount);
3981 
3982 			found = (vbeentry->st_procpid == pid);
3983 
3984 			pgstat_end_read_activity(vbeentry, after_changecount);
3985 
3986 			if (pgstat_read_activity_complete(before_changecount,
3987 											  after_changecount))
3988 				break;
3989 
3990 			/* Make sure we can break out of loop if stuck... */
3991 			CHECK_FOR_INTERRUPTS();
3992 		}
3993 
3994 		if (found)
3995 		{
3996 			/* Now it is safe to use the non-volatile pointer */
3997 			if (checkUser && !superuser() && beentry->st_userid != GetUserId())
3998 				return "<insufficient privilege>";
3999 			else if (*(beentry->st_activity) == '\0')
4000 				return "<command string not enabled>";
4001 			else
4002 				return beentry->st_activity;
4003 		}
4004 
4005 		beentry++;
4006 	}
4007 
4008 	/* If we get here, caller is in error ... */
4009 	return "<backend information not available>";
4010 }
4011 
4012 /* ----------
4013  * pgstat_get_crashed_backend_activity() -
4014  *
4015  *	Return a string representing the current activity of the backend with
4016  *	the specified PID.  Like the function above, but reads shared memory with
4017  *	the expectation that it may be corrupt.  On success, copy the string
4018  *	into the "buffer" argument and return that pointer.  On failure,
4019  *	return NULL.
4020  *
4021  *	This function is only intended to be used by the postmaster to report the
4022  *	query that crashed a backend.  In particular, no attempt is made to
4023  *	follow the correct concurrency protocol when accessing the
4024  *	BackendStatusArray.  But that's OK, in the worst case we'll return a
4025  *	corrupted message.  We also must take care not to trip on ereport(ERROR).
4026  * ----------
4027  */
4028 const char *
pgstat_get_crashed_backend_activity(int pid,char * buffer,int buflen)4029 pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen)
4030 {
4031 	volatile PgBackendStatus *beentry;
4032 	int			i;
4033 
4034 	beentry = BackendStatusArray;
4035 
4036 	/*
4037 	 * We probably shouldn't get here before shared memory has been set up,
4038 	 * but be safe.
4039 	 */
4040 	if (beentry == NULL || BackendActivityBuffer == NULL)
4041 		return NULL;
4042 
4043 	for (i = 1; i <= MaxBackends; i++)
4044 	{
4045 		if (beentry->st_procpid == pid)
4046 		{
4047 			/* Read pointer just once, so it can't change after validation */
4048 			const char *activity = beentry->st_activity;
4049 			const char *activity_last;
4050 
4051 			/*
4052 			 * We mustn't access activity string before we verify that it
4053 			 * falls within the BackendActivityBuffer. To make sure that the
4054 			 * entire string including its ending is contained within the
4055 			 * buffer, subtract one activity length from the buffer size.
4056 			 */
4057 			activity_last = BackendActivityBuffer + BackendActivityBufferSize
4058 				- pgstat_track_activity_query_size;
4059 
4060 			if (activity < BackendActivityBuffer ||
4061 				activity > activity_last)
4062 				return NULL;
4063 
4064 			/* If no string available, no point in a report */
4065 			if (activity[0] == '\0')
4066 				return NULL;
4067 
4068 			/*
4069 			 * Copy only ASCII-safe characters so we don't run into encoding
4070 			 * problems when reporting the message; and be sure not to run off
4071 			 * the end of memory.
4072 			 */
4073 			ascii_safe_strlcpy(buffer, activity,
4074 							   Min(buflen, pgstat_track_activity_query_size));
4075 
4076 			return buffer;
4077 		}
4078 
4079 		beentry++;
4080 	}
4081 
4082 	/* PID not found */
4083 	return NULL;
4084 }
4085 
4086 const char *
pgstat_get_backend_desc(BackendType backendType)4087 pgstat_get_backend_desc(BackendType backendType)
4088 {
4089 	const char *backendDesc = "unknown process type";
4090 
4091 	switch (backendType)
4092 	{
4093 		case B_AUTOVAC_LAUNCHER:
4094 			backendDesc = "autovacuum launcher";
4095 			break;
4096 		case B_AUTOVAC_WORKER:
4097 			backendDesc = "autovacuum worker";
4098 			break;
4099 		case B_BACKEND:
4100 			backendDesc = "client backend";
4101 			break;
4102 		case B_BG_WORKER:
4103 			backendDesc = "background worker";
4104 			break;
4105 		case B_BG_WRITER:
4106 			backendDesc = "background writer";
4107 			break;
4108 		case B_CHECKPOINTER:
4109 			backendDesc = "checkpointer";
4110 			break;
4111 		case B_STARTUP:
4112 			backendDesc = "startup";
4113 			break;
4114 		case B_WAL_RECEIVER:
4115 			backendDesc = "walreceiver";
4116 			break;
4117 		case B_WAL_SENDER:
4118 			backendDesc = "walsender";
4119 			break;
4120 		case B_WAL_WRITER:
4121 			backendDesc = "walwriter";
4122 			break;
4123 	}
4124 
4125 	return backendDesc;
4126 }
4127 
4128 /* ------------------------------------------------------------
4129  * Local support functions follow
4130  * ------------------------------------------------------------
4131  */
4132 
4133 
4134 /* ----------
4135  * pgstat_setheader() -
4136  *
4137  *		Set common header fields in a statistics message
4138  * ----------
4139  */
4140 static void
pgstat_setheader(PgStat_MsgHdr * hdr,StatMsgType mtype)4141 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
4142 {
4143 	hdr->m_type = mtype;
4144 }
4145 
4146 
4147 /* ----------
4148  * pgstat_send() -
4149  *
4150  *		Send out one statistics message to the collector
4151  * ----------
4152  */
4153 static void
pgstat_send(void * msg,int len)4154 pgstat_send(void *msg, int len)
4155 {
4156 	int			rc;
4157 
4158 	if (pgStatSock == PGINVALID_SOCKET)
4159 		return;
4160 
4161 	((PgStat_MsgHdr *) msg)->m_size = len;
4162 
4163 	/* We'll retry after EINTR, but ignore all other failures */
4164 	do
4165 	{
4166 		rc = send(pgStatSock, msg, len, 0);
4167 	} while (rc < 0 && errno == EINTR);
4168 
4169 #ifdef USE_ASSERT_CHECKING
4170 	/* In debug builds, log send failures ... */
4171 	if (rc < 0)
4172 		elog(LOG, "could not send to statistics collector: %m");
4173 #endif
4174 }
4175 
4176 /* ----------
4177  * pgstat_send_archiver() -
4178  *
4179  *	Tell the collector about the WAL file that we successfully
4180  *	archived or failed to archive.
4181  * ----------
4182  */
4183 void
pgstat_send_archiver(const char * xlog,bool failed)4184 pgstat_send_archiver(const char *xlog, bool failed)
4185 {
4186 	PgStat_MsgArchiver msg;
4187 
4188 	/*
4189 	 * Prepare and send the message
4190 	 */
4191 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ARCHIVER);
4192 	msg.m_failed = failed;
4193 	StrNCpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
4194 	msg.m_timestamp = GetCurrentTimestamp();
4195 	pgstat_send(&msg, sizeof(msg));
4196 }
4197 
4198 /* ----------
4199  * pgstat_send_bgwriter() -
4200  *
4201  *		Send bgwriter statistics to the collector
4202  * ----------
4203  */
4204 void
pgstat_send_bgwriter(void)4205 pgstat_send_bgwriter(void)
4206 {
4207 	/* We assume this initializes to zeroes */
4208 	static const PgStat_MsgBgWriter all_zeroes;
4209 
4210 	/*
4211 	 * This function can be called even if nothing at all has happened. In
4212 	 * this case, avoid sending a completely empty message to the stats
4213 	 * collector.
4214 	 */
4215 	if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
4216 		return;
4217 
4218 	/*
4219 	 * Prepare and send the message
4220 	 */
4221 	pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
4222 	pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
4223 
4224 	/*
4225 	 * Clear out the statistics buffer, so it can be re-used.
4226 	 */
4227 	MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
4228 }
4229 
4230 
4231 /* ----------
4232  * PgstatCollectorMain() -
4233  *
4234  *	Start up the statistics collector process.  This is the body of the
4235  *	postmaster child process.
4236  *
4237  *	The argc/argv parameters are valid only in EXEC_BACKEND case.
4238  * ----------
4239  */
4240 NON_EXEC_STATIC void
PgstatCollectorMain(int argc,char * argv[])4241 PgstatCollectorMain(int argc, char *argv[])
4242 {
4243 	int			len;
4244 	PgStat_Msg	msg;
4245 	int			wr;
4246 
4247 	/*
4248 	 * Ignore all signals usually bound to some action in the postmaster,
4249 	 * except SIGHUP and SIGQUIT.  Note we don't need a SIGUSR1 handler to
4250 	 * support latch operations, because we only use a local latch.
4251 	 */
4252 	pqsignal(SIGHUP, pgstat_sighup_handler);
4253 	pqsignal(SIGINT, SIG_IGN);
4254 	pqsignal(SIGTERM, SIG_IGN);
4255 	pqsignal(SIGQUIT, pgstat_exit);
4256 	pqsignal(SIGALRM, SIG_IGN);
4257 	pqsignal(SIGPIPE, SIG_IGN);
4258 	pqsignal(SIGUSR1, SIG_IGN);
4259 	pqsignal(SIGUSR2, SIG_IGN);
4260 	pqsignal(SIGCHLD, SIG_DFL);
4261 	pqsignal(SIGTTIN, SIG_DFL);
4262 	pqsignal(SIGTTOU, SIG_DFL);
4263 	pqsignal(SIGCONT, SIG_DFL);
4264 	pqsignal(SIGWINCH, SIG_DFL);
4265 	PG_SETMASK(&UnBlockSig);
4266 
4267 	/*
4268 	 * Identify myself via ps
4269 	 */
4270 	init_ps_display("stats collector process", "", "", "");
4271 
4272 	/*
4273 	 * Read in existing stats files or initialize the stats to zero.
4274 	 */
4275 	pgStatRunningInCollector = true;
4276 	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
4277 
4278 	/*
4279 	 * Loop to process messages until we get SIGQUIT or detect ungraceful
4280 	 * death of our parent postmaster.
4281 	 *
4282 	 * For performance reasons, we don't want to do ResetLatch/WaitLatch after
4283 	 * every message; instead, do that only after a recv() fails to obtain a
4284 	 * message.  (This effectively means that if backends are sending us stuff
4285 	 * like mad, we won't notice postmaster death until things slack off a
4286 	 * bit; which seems fine.)	To do that, we have an inner loop that
4287 	 * iterates as long as recv() succeeds.  We do recognize got_SIGHUP inside
4288 	 * the inner loop, which means that such interrupts will get serviced but
4289 	 * the latch won't get cleared until next time there is a break in the
4290 	 * action.
4291 	 */
4292 	for (;;)
4293 	{
4294 		/* Clear any already-pending wakeups */
4295 		ResetLatch(MyLatch);
4296 
4297 		/*
4298 		 * Quit if we get SIGQUIT from the postmaster.
4299 		 */
4300 		if (need_exit)
4301 			break;
4302 
4303 		/*
4304 		 * Inner loop iterates as long as we keep getting messages, or until
4305 		 * need_exit becomes set.
4306 		 */
4307 		while (!need_exit)
4308 		{
4309 			/*
4310 			 * Reload configuration if we got SIGHUP from the postmaster.
4311 			 */
4312 			if (got_SIGHUP)
4313 			{
4314 				got_SIGHUP = false;
4315 				ProcessConfigFile(PGC_SIGHUP);
4316 			}
4317 
4318 			/*
4319 			 * Write the stats file(s) if a new request has arrived that is
4320 			 * not satisfied by existing file(s).
4321 			 */
4322 			if (pgstat_write_statsfile_needed())
4323 				pgstat_write_statsfiles(false, false);
4324 
4325 			/*
4326 			 * Try to receive and process a message.  This will not block,
4327 			 * since the socket is set to non-blocking mode.
4328 			 *
4329 			 * XXX On Windows, we have to force pgwin32_recv to cooperate,
4330 			 * despite the previous use of pg_set_noblock() on the socket.
4331 			 * This is extremely broken and should be fixed someday.
4332 			 */
4333 #ifdef WIN32
4334 			pgwin32_noblock = 1;
4335 #endif
4336 
4337 			len = recv(pgStatSock, (char *) &msg,
4338 					   sizeof(PgStat_Msg), 0);
4339 
4340 #ifdef WIN32
4341 			pgwin32_noblock = 0;
4342 #endif
4343 
4344 			if (len < 0)
4345 			{
4346 				if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
4347 					break;		/* out of inner loop */
4348 				ereport(ERROR,
4349 						(errcode_for_socket_access(),
4350 						 errmsg("could not read statistics message: %m")));
4351 			}
4352 
4353 			/*
4354 			 * We ignore messages that are smaller than our common header
4355 			 */
4356 			if (len < sizeof(PgStat_MsgHdr))
4357 				continue;
4358 
4359 			/*
4360 			 * The received length must match the length in the header
4361 			 */
4362 			if (msg.msg_hdr.m_size != len)
4363 				continue;
4364 
4365 			/*
4366 			 * O.K. - we accept this message.  Process it.
4367 			 */
4368 			switch (msg.msg_hdr.m_type)
4369 			{
4370 				case PGSTAT_MTYPE_DUMMY:
4371 					break;
4372 
4373 				case PGSTAT_MTYPE_INQUIRY:
4374 					pgstat_recv_inquiry((PgStat_MsgInquiry *) &msg, len);
4375 					break;
4376 
4377 				case PGSTAT_MTYPE_TABSTAT:
4378 					pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, len);
4379 					break;
4380 
4381 				case PGSTAT_MTYPE_TABPURGE:
4382 					pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, len);
4383 					break;
4384 
4385 				case PGSTAT_MTYPE_DROPDB:
4386 					pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, len);
4387 					break;
4388 
4389 				case PGSTAT_MTYPE_RESETCOUNTER:
4390 					pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg,
4391 											 len);
4392 					break;
4393 
4394 				case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
4395 					pgstat_recv_resetsharedcounter(
4396 												   (PgStat_MsgResetsharedcounter *) &msg,
4397 												   len);
4398 					break;
4399 
4400 				case PGSTAT_MTYPE_RESETSINGLECOUNTER:
4401 					pgstat_recv_resetsinglecounter(
4402 												   (PgStat_MsgResetsinglecounter *) &msg,
4403 												   len);
4404 					break;
4405 
4406 				case PGSTAT_MTYPE_AUTOVAC_START:
4407 					pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, len);
4408 					break;
4409 
4410 				case PGSTAT_MTYPE_VACUUM:
4411 					pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, len);
4412 					break;
4413 
4414 				case PGSTAT_MTYPE_ANALYZE:
4415 					pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, len);
4416 					break;
4417 
4418 				case PGSTAT_MTYPE_ARCHIVER:
4419 					pgstat_recv_archiver((PgStat_MsgArchiver *) &msg, len);
4420 					break;
4421 
4422 				case PGSTAT_MTYPE_BGWRITER:
4423 					pgstat_recv_bgwriter((PgStat_MsgBgWriter *) &msg, len);
4424 					break;
4425 
4426 				case PGSTAT_MTYPE_FUNCSTAT:
4427 					pgstat_recv_funcstat((PgStat_MsgFuncstat *) &msg, len);
4428 					break;
4429 
4430 				case PGSTAT_MTYPE_FUNCPURGE:
4431 					pgstat_recv_funcpurge((PgStat_MsgFuncpurge *) &msg, len);
4432 					break;
4433 
4434 				case PGSTAT_MTYPE_RECOVERYCONFLICT:
4435 					pgstat_recv_recoveryconflict((PgStat_MsgRecoveryConflict *) &msg, len);
4436 					break;
4437 
4438 				case PGSTAT_MTYPE_DEADLOCK:
4439 					pgstat_recv_deadlock((PgStat_MsgDeadlock *) &msg, len);
4440 					break;
4441 
4442 				case PGSTAT_MTYPE_TEMPFILE:
4443 					pgstat_recv_tempfile((PgStat_MsgTempFile *) &msg, len);
4444 					break;
4445 
4446 				default:
4447 					break;
4448 			}
4449 		}						/* end of inner message-processing loop */
4450 
4451 		/* Sleep until there's something to do */
4452 #ifndef WIN32
4453 		wr = WaitLatchOrSocket(MyLatch,
4454 							   WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
4455 							   pgStatSock, -1L,
4456 							   WAIT_EVENT_PGSTAT_MAIN);
4457 #else
4458 
4459 		/*
4460 		 * Windows, at least in its Windows Server 2003 R2 incarnation,
4461 		 * sometimes loses FD_READ events.  Waking up and retrying the recv()
4462 		 * fixes that, so don't sleep indefinitely.  This is a crock of the
4463 		 * first water, but until somebody wants to debug exactly what's
4464 		 * happening there, this is the best we can do.  The two-second
4465 		 * timeout matches our pre-9.2 behavior, and needs to be short enough
4466 		 * to not provoke "using stale statistics" complaints from
4467 		 * backend_read_statsfile.
4468 		 */
4469 		wr = WaitLatchOrSocket(MyLatch,
4470 							   WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT,
4471 							   pgStatSock,
4472 							   2 * 1000L /* msec */ ,
4473 							   WAIT_EVENT_PGSTAT_MAIN);
4474 #endif
4475 
4476 		/*
4477 		 * Emergency bailout if postmaster has died.  This is to avoid the
4478 		 * necessity for manual cleanup of all postmaster children.
4479 		 */
4480 		if (wr & WL_POSTMASTER_DEATH)
4481 			break;
4482 	}							/* end of outer loop */
4483 
4484 	/*
4485 	 * Save the final stats to reuse at next startup.
4486 	 */
4487 	pgstat_write_statsfiles(true, true);
4488 
4489 	exit(0);
4490 }
4491 
4492 
4493 /* SIGQUIT signal handler for collector process */
4494 static void
pgstat_exit(SIGNAL_ARGS)4495 pgstat_exit(SIGNAL_ARGS)
4496 {
4497 	int			save_errno = errno;
4498 
4499 	need_exit = true;
4500 	SetLatch(MyLatch);
4501 
4502 	errno = save_errno;
4503 }
4504 
4505 /* SIGHUP handler for collector process */
4506 static void
pgstat_sighup_handler(SIGNAL_ARGS)4507 pgstat_sighup_handler(SIGNAL_ARGS)
4508 {
4509 	int			save_errno = errno;
4510 
4511 	got_SIGHUP = true;
4512 	SetLatch(MyLatch);
4513 
4514 	errno = save_errno;
4515 }
4516 
4517 /*
4518  * Subroutine to clear stats in a database entry
4519  *
4520  * Tables and functions hashes are initialized to empty.
4521  */
4522 static void
reset_dbentry_counters(PgStat_StatDBEntry * dbentry)4523 reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
4524 {
4525 	HASHCTL		hash_ctl;
4526 
4527 	dbentry->n_xact_commit = 0;
4528 	dbentry->n_xact_rollback = 0;
4529 	dbentry->n_blocks_fetched = 0;
4530 	dbentry->n_blocks_hit = 0;
4531 	dbentry->n_tuples_returned = 0;
4532 	dbentry->n_tuples_fetched = 0;
4533 	dbentry->n_tuples_inserted = 0;
4534 	dbentry->n_tuples_updated = 0;
4535 	dbentry->n_tuples_deleted = 0;
4536 	dbentry->last_autovac_time = 0;
4537 	dbentry->n_conflict_tablespace = 0;
4538 	dbentry->n_conflict_lock = 0;
4539 	dbentry->n_conflict_snapshot = 0;
4540 	dbentry->n_conflict_bufferpin = 0;
4541 	dbentry->n_conflict_startup_deadlock = 0;
4542 	dbentry->n_temp_files = 0;
4543 	dbentry->n_temp_bytes = 0;
4544 	dbentry->n_deadlocks = 0;
4545 	dbentry->n_block_read_time = 0;
4546 	dbentry->n_block_write_time = 0;
4547 
4548 	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
4549 	dbentry->stats_timestamp = 0;
4550 
4551 	memset(&hash_ctl, 0, sizeof(hash_ctl));
4552 	hash_ctl.keysize = sizeof(Oid);
4553 	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
4554 	dbentry->tables = hash_create("Per-database table",
4555 								  PGSTAT_TAB_HASH_SIZE,
4556 								  &hash_ctl,
4557 								  HASH_ELEM | HASH_BLOBS);
4558 
4559 	hash_ctl.keysize = sizeof(Oid);
4560 	hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
4561 	dbentry->functions = hash_create("Per-database function",
4562 									 PGSTAT_FUNCTION_HASH_SIZE,
4563 									 &hash_ctl,
4564 									 HASH_ELEM | HASH_BLOBS);
4565 }
4566 
4567 /*
4568  * Lookup the hash table entry for the specified database. If no hash
4569  * table entry exists, initialize it, if the create parameter is true.
4570  * Else, return NULL.
4571  */
4572 static PgStat_StatDBEntry *
pgstat_get_db_entry(Oid databaseid,bool create)4573 pgstat_get_db_entry(Oid databaseid, bool create)
4574 {
4575 	PgStat_StatDBEntry *result;
4576 	bool		found;
4577 	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
4578 
4579 	/* Lookup or create the hash table entry for this database */
4580 	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
4581 												&databaseid,
4582 												action, &found);
4583 
4584 	if (!create && !found)
4585 		return NULL;
4586 
4587 	/*
4588 	 * If not found, initialize the new one.  This creates empty hash tables
4589 	 * for tables and functions, too.
4590 	 */
4591 	if (!found)
4592 		reset_dbentry_counters(result);
4593 
4594 	return result;
4595 }
4596 
4597 
4598 /*
4599  * Lookup the hash table entry for the specified table. If no hash
4600  * table entry exists, initialize it, if the create parameter is true.
4601  * Else, return NULL.
4602  */
4603 static PgStat_StatTabEntry *
pgstat_get_tab_entry(PgStat_StatDBEntry * dbentry,Oid tableoid,bool create)4604 pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
4605 {
4606 	PgStat_StatTabEntry *result;
4607 	bool		found;
4608 	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
4609 
4610 	/* Lookup or create the hash table entry for this table */
4611 	result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
4612 												 &tableoid,
4613 												 action, &found);
4614 
4615 	if (!create && !found)
4616 		return NULL;
4617 
4618 	/* If not found, initialize the new one. */
4619 	if (!found)
4620 	{
4621 		result->numscans = 0;
4622 		result->tuples_returned = 0;
4623 		result->tuples_fetched = 0;
4624 		result->tuples_inserted = 0;
4625 		result->tuples_updated = 0;
4626 		result->tuples_deleted = 0;
4627 		result->tuples_hot_updated = 0;
4628 		result->n_live_tuples = 0;
4629 		result->n_dead_tuples = 0;
4630 		result->changes_since_analyze = 0;
4631 		result->blocks_fetched = 0;
4632 		result->blocks_hit = 0;
4633 		result->vacuum_timestamp = 0;
4634 		result->vacuum_count = 0;
4635 		result->autovac_vacuum_timestamp = 0;
4636 		result->autovac_vacuum_count = 0;
4637 		result->analyze_timestamp = 0;
4638 		result->analyze_count = 0;
4639 		result->autovac_analyze_timestamp = 0;
4640 		result->autovac_analyze_count = 0;
4641 	}
4642 
4643 	return result;
4644 }
4645 
4646 
4647 /* ----------
4648  * pgstat_write_statsfiles() -
4649  *		Write the global statistics file, as well as requested DB files.
4650  *
4651  *	'permanent' specifies writing to the permanent files not temporary ones.
4652  *	When true (happens only when the collector is shutting down), also remove
4653  *	the temporary files so that backends starting up under a new postmaster
4654  *	can't read old data before the new collector is ready.
4655  *
4656  *	When 'allDbs' is false, only the requested databases (listed in
4657  *	pending_write_requests) will be written; otherwise, all databases
4658  *	will be written.
4659  * ----------
4660  */
4661 static void
pgstat_write_statsfiles(bool permanent,bool allDbs)4662 pgstat_write_statsfiles(bool permanent, bool allDbs)
4663 {
4664 	HASH_SEQ_STATUS hstat;
4665 	PgStat_StatDBEntry *dbentry;
4666 	FILE	   *fpout;
4667 	int32		format_id;
4668 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
4669 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4670 	int			rc;
4671 
4672 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
4673 
4674 	/*
4675 	 * Open the statistics temp file to write out the current values.
4676 	 */
4677 	fpout = AllocateFile(tmpfile, PG_BINARY_W);
4678 	if (fpout == NULL)
4679 	{
4680 		ereport(LOG,
4681 				(errcode_for_file_access(),
4682 				 errmsg("could not open temporary statistics file \"%s\": %m",
4683 						tmpfile)));
4684 		return;
4685 	}
4686 
4687 	/*
4688 	 * Set the timestamp of the stats file.
4689 	 */
4690 	globalStats.stats_timestamp = GetCurrentTimestamp();
4691 
4692 	/*
4693 	 * Write the file header --- currently just a format ID.
4694 	 */
4695 	format_id = PGSTAT_FILE_FORMAT_ID;
4696 	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
4697 	(void) rc;					/* we'll check for error with ferror */
4698 
4699 	/*
4700 	 * Write global stats struct
4701 	 */
4702 	rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
4703 	(void) rc;					/* we'll check for error with ferror */
4704 
4705 	/*
4706 	 * Write archiver stats struct
4707 	 */
4708 	rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
4709 	(void) rc;					/* we'll check for error with ferror */
4710 
4711 	/*
4712 	 * Walk through the database table.
4713 	 */
4714 	hash_seq_init(&hstat, pgStatDBHash);
4715 	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
4716 	{
4717 		/*
4718 		 * Write out the table and function stats for this DB into the
4719 		 * appropriate per-DB stat file, if required.
4720 		 */
4721 		if (allDbs || pgstat_db_requested(dbentry->databaseid))
4722 		{
4723 			/* Make DB's timestamp consistent with the global stats */
4724 			dbentry->stats_timestamp = globalStats.stats_timestamp;
4725 
4726 			pgstat_write_db_statsfile(dbentry, permanent);
4727 		}
4728 
4729 		/*
4730 		 * Write out the DB entry. We don't write the tables or functions
4731 		 * pointers, since they're of no use to any other process.
4732 		 */
4733 		fputc('D', fpout);
4734 		rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
4735 		(void) rc;				/* we'll check for error with ferror */
4736 	}
4737 
4738 	/*
4739 	 * No more output to be done. Close the temp file and replace the old
4740 	 * pgstat.stat with it.  The ferror() check replaces testing for error
4741 	 * after each individual fputc or fwrite above.
4742 	 */
4743 	fputc('E', fpout);
4744 
4745 	if (ferror(fpout))
4746 	{
4747 		ereport(LOG,
4748 				(errcode_for_file_access(),
4749 				 errmsg("could not write temporary statistics file \"%s\": %m",
4750 						tmpfile)));
4751 		FreeFile(fpout);
4752 		unlink(tmpfile);
4753 	}
4754 	else if (FreeFile(fpout) < 0)
4755 	{
4756 		ereport(LOG,
4757 				(errcode_for_file_access(),
4758 				 errmsg("could not close temporary statistics file \"%s\": %m",
4759 						tmpfile)));
4760 		unlink(tmpfile);
4761 	}
4762 	else if (rename(tmpfile, statfile) < 0)
4763 	{
4764 		ereport(LOG,
4765 				(errcode_for_file_access(),
4766 				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
4767 						tmpfile, statfile)));
4768 		unlink(tmpfile);
4769 	}
4770 
4771 	if (permanent)
4772 		unlink(pgstat_stat_filename);
4773 
4774 	/*
4775 	 * Now throw away the list of requests.  Note that requests sent after we
4776 	 * started the write are still waiting on the network socket.
4777 	 */
4778 	list_free(pending_write_requests);
4779 	pending_write_requests = NIL;
4780 }
4781 
4782 /*
4783  * return the filename for a DB stat file; filename is the output buffer,
4784  * of length len.
4785  */
4786 static void
get_dbstat_filename(bool permanent,bool tempname,Oid databaseid,char * filename,int len)4787 get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
4788 					char *filename, int len)
4789 {
4790 	int			printed;
4791 
4792 	/* NB -- pgstat_reset_remove_files knows about the pattern this uses */
4793 	printed = snprintf(filename, len, "%s/db_%u.%s",
4794 					   permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
4795 					   pgstat_stat_directory,
4796 					   databaseid,
4797 					   tempname ? "tmp" : "stat");
4798 	if (printed >= len)
4799 		elog(ERROR, "overlength pgstat path");
4800 }
4801 
4802 /* ----------
4803  * pgstat_write_db_statsfile() -
4804  *		Write the stat file for a single database.
4805  *
4806  *	If writing to the permanent file (happens when the collector is
4807  *	shutting down only), remove the temporary file so that backends
4808  *	starting up under a new postmaster can't read the old data before
4809  *	the new collector is ready.
4810  * ----------
4811  */
4812 static void
pgstat_write_db_statsfile(PgStat_StatDBEntry * dbentry,bool permanent)4813 pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
4814 {
4815 	HASH_SEQ_STATUS tstat;
4816 	HASH_SEQ_STATUS fstat;
4817 	PgStat_StatTabEntry *tabentry;
4818 	PgStat_StatFuncEntry *funcentry;
4819 	FILE	   *fpout;
4820 	int32		format_id;
4821 	Oid			dbid = dbentry->databaseid;
4822 	int			rc;
4823 	char		tmpfile[MAXPGPATH];
4824 	char		statfile[MAXPGPATH];
4825 
4826 	get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
4827 	get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
4828 
4829 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
4830 
4831 	/*
4832 	 * Open the statistics temp file to write out the current values.
4833 	 */
4834 	fpout = AllocateFile(tmpfile, PG_BINARY_W);
4835 	if (fpout == NULL)
4836 	{
4837 		ereport(LOG,
4838 				(errcode_for_file_access(),
4839 				 errmsg("could not open temporary statistics file \"%s\": %m",
4840 						tmpfile)));
4841 		return;
4842 	}
4843 
4844 	/*
4845 	 * Write the file header --- currently just a format ID.
4846 	 */
4847 	format_id = PGSTAT_FILE_FORMAT_ID;
4848 	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
4849 	(void) rc;					/* we'll check for error with ferror */
4850 
4851 	/*
4852 	 * Walk through the database's access stats per table.
4853 	 */
4854 	hash_seq_init(&tstat, dbentry->tables);
4855 	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
4856 	{
4857 		fputc('T', fpout);
4858 		rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
4859 		(void) rc;				/* we'll check for error with ferror */
4860 	}
4861 
4862 	/*
4863 	 * Walk through the database's function stats table.
4864 	 */
4865 	hash_seq_init(&fstat, dbentry->functions);
4866 	while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
4867 	{
4868 		fputc('F', fpout);
4869 		rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
4870 		(void) rc;				/* we'll check for error with ferror */
4871 	}
4872 
4873 	/*
4874 	 * No more output to be done. Close the temp file and replace the old
4875 	 * pgstat.stat with it.  The ferror() check replaces testing for error
4876 	 * after each individual fputc or fwrite above.
4877 	 */
4878 	fputc('E', fpout);
4879 
4880 	if (ferror(fpout))
4881 	{
4882 		ereport(LOG,
4883 				(errcode_for_file_access(),
4884 				 errmsg("could not write temporary statistics file \"%s\": %m",
4885 						tmpfile)));
4886 		FreeFile(fpout);
4887 		unlink(tmpfile);
4888 	}
4889 	else if (FreeFile(fpout) < 0)
4890 	{
4891 		ereport(LOG,
4892 				(errcode_for_file_access(),
4893 				 errmsg("could not close temporary statistics file \"%s\": %m",
4894 						tmpfile)));
4895 		unlink(tmpfile);
4896 	}
4897 	else if (rename(tmpfile, statfile) < 0)
4898 	{
4899 		ereport(LOG,
4900 				(errcode_for_file_access(),
4901 				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
4902 						tmpfile, statfile)));
4903 		unlink(tmpfile);
4904 	}
4905 
4906 	if (permanent)
4907 	{
4908 		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
4909 
4910 		elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
4911 		unlink(statfile);
4912 	}
4913 }
4914 
4915 /* ----------
4916  * pgstat_read_statsfiles() -
4917  *
4918  *	Reads in some existing statistics collector files and returns the
4919  *	databases hash table that is the top level of the data.
4920  *
4921  *	If 'onlydb' is not InvalidOid, it means we only want data for that DB
4922  *	plus the shared catalogs ("DB 0").  We'll still populate the DB hash
4923  *	table for all databases, but we don't bother even creating table/function
4924  *	hash tables for other databases.
4925  *
4926  *	'permanent' specifies reading from the permanent files not temporary ones.
4927  *	When true (happens only when the collector is starting up), remove the
4928  *	files after reading; the in-memory status is now authoritative, and the
4929  *	files would be out of date in case somebody else reads them.
4930  *
4931  *	If a 'deep' read is requested, table/function stats are read, otherwise
4932  *	the table/function hash tables remain empty.
4933  * ----------
4934  */
4935 static HTAB *
pgstat_read_statsfiles(Oid onlydb,bool permanent,bool deep)4936 pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
4937 {
4938 	PgStat_StatDBEntry *dbentry;
4939 	PgStat_StatDBEntry dbbuf;
4940 	HASHCTL		hash_ctl;
4941 	HTAB	   *dbhash;
4942 	FILE	   *fpin;
4943 	int32		format_id;
4944 	bool		found;
4945 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4946 
4947 	/*
4948 	 * The tables will live in pgStatLocalContext.
4949 	 */
4950 	pgstat_setup_memcxt();
4951 
4952 	/*
4953 	 * Create the DB hashtable
4954 	 */
4955 	memset(&hash_ctl, 0, sizeof(hash_ctl));
4956 	hash_ctl.keysize = sizeof(Oid);
4957 	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
4958 	hash_ctl.hcxt = pgStatLocalContext;
4959 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
4960 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4961 
4962 	/*
4963 	 * Clear out global and archiver statistics so they start from zero in
4964 	 * case we can't load an existing statsfile.
4965 	 */
4966 	memset(&globalStats, 0, sizeof(globalStats));
4967 	memset(&archiverStats, 0, sizeof(archiverStats));
4968 
4969 	/*
4970 	 * Set the current timestamp (will be kept only in case we can't load an
4971 	 * existing statsfile).
4972 	 */
4973 	globalStats.stat_reset_timestamp = GetCurrentTimestamp();
4974 	archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
4975 
4976 	/*
4977 	 * Try to open the stats file. If it doesn't exist, the backends simply
4978 	 * return zero for anything and the collector simply starts from scratch
4979 	 * with empty counters.
4980 	 *
4981 	 * ENOENT is a possibility if the stats collector is not running or has
4982 	 * not yet written the stats file the first time.  Any other failure
4983 	 * condition is suspicious.
4984 	 */
4985 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4986 	{
4987 		if (errno != ENOENT)
4988 			ereport(pgStatRunningInCollector ? LOG : WARNING,
4989 					(errcode_for_file_access(),
4990 					 errmsg("could not open statistics file \"%s\": %m",
4991 							statfile)));
4992 		return dbhash;
4993 	}
4994 
4995 	/*
4996 	 * Verify it's of the expected format.
4997 	 */
4998 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4999 		format_id != PGSTAT_FILE_FORMAT_ID)
5000 	{
5001 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5002 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5003 		goto done;
5004 	}
5005 
5006 	/*
5007 	 * Read global stats struct
5008 	 */
5009 	if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
5010 	{
5011 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5012 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5013 		memset(&globalStats, 0, sizeof(globalStats));
5014 		goto done;
5015 	}
5016 
5017 	/*
5018 	 * In the collector, disregard the timestamp we read from the permanent
5019 	 * stats file; we should be willing to write a temp stats file immediately
5020 	 * upon the first request from any backend.  This only matters if the old
5021 	 * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
5022 	 * an unusual scenario.
5023 	 */
5024 	if (pgStatRunningInCollector)
5025 		globalStats.stats_timestamp = 0;
5026 
5027 	/*
5028 	 * Read archiver stats struct
5029 	 */
5030 	if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
5031 	{
5032 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5033 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5034 		memset(&archiverStats, 0, sizeof(archiverStats));
5035 		goto done;
5036 	}
5037 
5038 	/*
5039 	 * We found an existing collector stats file. Read it and put all the
5040 	 * hashtable entries into place.
5041 	 */
5042 	for (;;)
5043 	{
5044 		switch (fgetc(fpin))
5045 		{
5046 				/*
5047 				 * 'D'	A PgStat_StatDBEntry struct describing a database
5048 				 * follows.
5049 				 */
5050 			case 'D':
5051 				if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
5052 						  fpin) != offsetof(PgStat_StatDBEntry, tables))
5053 				{
5054 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5055 							(errmsg("corrupted statistics file \"%s\"",
5056 									statfile)));
5057 					goto done;
5058 				}
5059 
5060 				/*
5061 				 * Add to the DB hash
5062 				 */
5063 				dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
5064 															 (void *) &dbbuf.databaseid,
5065 															 HASH_ENTER,
5066 															 &found);
5067 				if (found)
5068 				{
5069 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5070 							(errmsg("corrupted statistics file \"%s\"",
5071 									statfile)));
5072 					goto done;
5073 				}
5074 
5075 				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
5076 				dbentry->tables = NULL;
5077 				dbentry->functions = NULL;
5078 
5079 				/*
5080 				 * In the collector, disregard the timestamp we read from the
5081 				 * permanent stats file; we should be willing to write a temp
5082 				 * stats file immediately upon the first request from any
5083 				 * backend.
5084 				 */
5085 				if (pgStatRunningInCollector)
5086 					dbentry->stats_timestamp = 0;
5087 
5088 				/*
5089 				 * Don't create tables/functions hashtables for uninteresting
5090 				 * databases.
5091 				 */
5092 				if (onlydb != InvalidOid)
5093 				{
5094 					if (dbbuf.databaseid != onlydb &&
5095 						dbbuf.databaseid != InvalidOid)
5096 						break;
5097 				}
5098 
5099 				memset(&hash_ctl, 0, sizeof(hash_ctl));
5100 				hash_ctl.keysize = sizeof(Oid);
5101 				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
5102 				hash_ctl.hcxt = pgStatLocalContext;
5103 				dbentry->tables = hash_create("Per-database table",
5104 											  PGSTAT_TAB_HASH_SIZE,
5105 											  &hash_ctl,
5106 											  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
5107 
5108 				hash_ctl.keysize = sizeof(Oid);
5109 				hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
5110 				hash_ctl.hcxt = pgStatLocalContext;
5111 				dbentry->functions = hash_create("Per-database function",
5112 												 PGSTAT_FUNCTION_HASH_SIZE,
5113 												 &hash_ctl,
5114 												 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
5115 
5116 				/*
5117 				 * If requested, read the data from the database-specific
5118 				 * file.  Otherwise we just leave the hashtables empty.
5119 				 */
5120 				if (deep)
5121 					pgstat_read_db_statsfile(dbentry->databaseid,
5122 											 dbentry->tables,
5123 											 dbentry->functions,
5124 											 permanent);
5125 
5126 				break;
5127 
5128 			case 'E':
5129 				goto done;
5130 
5131 			default:
5132 				ereport(pgStatRunningInCollector ? LOG : WARNING,
5133 						(errmsg("corrupted statistics file \"%s\"",
5134 								statfile)));
5135 				goto done;
5136 		}
5137 	}
5138 
5139 done:
5140 	FreeFile(fpin);
5141 
5142 	/* If requested to read the permanent file, also get rid of it. */
5143 	if (permanent)
5144 	{
5145 		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
5146 		unlink(statfile);
5147 	}
5148 
5149 	return dbhash;
5150 }
5151 
5152 
5153 /* ----------
5154  * pgstat_read_db_statsfile() -
5155  *
5156  *	Reads in the existing statistics collector file for the given database,
5157  *	filling the passed-in tables and functions hash tables.
5158  *
5159  *	As in pgstat_read_statsfiles, if the permanent file is requested, it is
5160  *	removed after reading.
5161  *
5162  *	Note: this code has the ability to skip storing per-table or per-function
5163  *	data, if NULL is passed for the corresponding hashtable.  That's not used
5164  *	at the moment though.
5165  * ----------
5166  */
5167 static void
pgstat_read_db_statsfile(Oid databaseid,HTAB * tabhash,HTAB * funchash,bool permanent)5168 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
5169 						 bool permanent)
5170 {
5171 	PgStat_StatTabEntry *tabentry;
5172 	PgStat_StatTabEntry tabbuf;
5173 	PgStat_StatFuncEntry funcbuf;
5174 	PgStat_StatFuncEntry *funcentry;
5175 	FILE	   *fpin;
5176 	int32		format_id;
5177 	bool		found;
5178 	char		statfile[MAXPGPATH];
5179 
5180 	get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
5181 
5182 	/*
5183 	 * Try to open the stats file. If it doesn't exist, the backends simply
5184 	 * return zero for anything and the collector simply starts from scratch
5185 	 * with empty counters.
5186 	 *
5187 	 * ENOENT is a possibility if the stats collector is not running or has
5188 	 * not yet written the stats file the first time.  Any other failure
5189 	 * condition is suspicious.
5190 	 */
5191 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
5192 	{
5193 		if (errno != ENOENT)
5194 			ereport(pgStatRunningInCollector ? LOG : WARNING,
5195 					(errcode_for_file_access(),
5196 					 errmsg("could not open statistics file \"%s\": %m",
5197 							statfile)));
5198 		return;
5199 	}
5200 
5201 	/*
5202 	 * Verify it's of the expected format.
5203 	 */
5204 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
5205 		format_id != PGSTAT_FILE_FORMAT_ID)
5206 	{
5207 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5208 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5209 		goto done;
5210 	}
5211 
5212 	/*
5213 	 * We found an existing collector stats file. Read it and put all the
5214 	 * hashtable entries into place.
5215 	 */
5216 	for (;;)
5217 	{
5218 		switch (fgetc(fpin))
5219 		{
5220 				/*
5221 				 * 'T'	A PgStat_StatTabEntry follows.
5222 				 */
5223 			case 'T':
5224 				if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
5225 						  fpin) != sizeof(PgStat_StatTabEntry))
5226 				{
5227 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5228 							(errmsg("corrupted statistics file \"%s\"",
5229 									statfile)));
5230 					goto done;
5231 				}
5232 
5233 				/*
5234 				 * Skip if table data not wanted.
5235 				 */
5236 				if (tabhash == NULL)
5237 					break;
5238 
5239 				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
5240 															   (void *) &tabbuf.tableid,
5241 															   HASH_ENTER, &found);
5242 
5243 				if (found)
5244 				{
5245 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5246 							(errmsg("corrupted statistics file \"%s\"",
5247 									statfile)));
5248 					goto done;
5249 				}
5250 
5251 				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
5252 				break;
5253 
5254 				/*
5255 				 * 'F'	A PgStat_StatFuncEntry follows.
5256 				 */
5257 			case 'F':
5258 				if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
5259 						  fpin) != sizeof(PgStat_StatFuncEntry))
5260 				{
5261 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5262 							(errmsg("corrupted statistics file \"%s\"",
5263 									statfile)));
5264 					goto done;
5265 				}
5266 
5267 				/*
5268 				 * Skip if function data not wanted.
5269 				 */
5270 				if (funchash == NULL)
5271 					break;
5272 
5273 				funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
5274 																 (void *) &funcbuf.functionid,
5275 																 HASH_ENTER, &found);
5276 
5277 				if (found)
5278 				{
5279 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5280 							(errmsg("corrupted statistics file \"%s\"",
5281 									statfile)));
5282 					goto done;
5283 				}
5284 
5285 				memcpy(funcentry, &funcbuf, sizeof(funcbuf));
5286 				break;
5287 
5288 				/*
5289 				 * 'E'	The EOF marker of a complete stats file.
5290 				 */
5291 			case 'E':
5292 				goto done;
5293 
5294 			default:
5295 				ereport(pgStatRunningInCollector ? LOG : WARNING,
5296 						(errmsg("corrupted statistics file \"%s\"",
5297 								statfile)));
5298 				goto done;
5299 		}
5300 	}
5301 
5302 done:
5303 	FreeFile(fpin);
5304 
5305 	if (permanent)
5306 	{
5307 		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
5308 		unlink(statfile);
5309 	}
5310 }
5311 
5312 /* ----------
5313  * pgstat_read_db_statsfile_timestamp() -
5314  *
5315  *	Attempt to determine the timestamp of the last db statfile write.
5316  *	Returns TRUE if successful; the timestamp is stored in *ts.
5317  *
5318  *	This needs to be careful about handling databases for which no stats file
5319  *	exists, such as databases without a stat entry or those not yet written:
5320  *
5321  *	- if there's a database entry in the global file, return the corresponding
5322  *	stats_timestamp value.
5323  *
5324  *	- if there's no db stat entry (e.g. for a new or inactive database),
5325  *	there's no stats_timestamp value, but also nothing to write so we return
5326  *	the timestamp of the global statfile.
5327  * ----------
5328  */
5329 static bool
pgstat_read_db_statsfile_timestamp(Oid databaseid,bool permanent,TimestampTz * ts)5330 pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
5331 								   TimestampTz *ts)
5332 {
5333 	PgStat_StatDBEntry dbentry;
5334 	PgStat_GlobalStats myGlobalStats;
5335 	PgStat_ArchiverStats myArchiverStats;
5336 	FILE	   *fpin;
5337 	int32		format_id;
5338 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
5339 
5340 	/*
5341 	 * Try to open the stats file.  As above, anything but ENOENT is worthy of
5342 	 * complaining about.
5343 	 */
5344 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
5345 	{
5346 		if (errno != ENOENT)
5347 			ereport(pgStatRunningInCollector ? LOG : WARNING,
5348 					(errcode_for_file_access(),
5349 					 errmsg("could not open statistics file \"%s\": %m",
5350 							statfile)));
5351 		return false;
5352 	}
5353 
5354 	/*
5355 	 * Verify it's of the expected format.
5356 	 */
5357 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
5358 		format_id != PGSTAT_FILE_FORMAT_ID)
5359 	{
5360 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5361 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5362 		FreeFile(fpin);
5363 		return false;
5364 	}
5365 
5366 	/*
5367 	 * Read global stats struct
5368 	 */
5369 	if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
5370 			  fpin) != sizeof(myGlobalStats))
5371 	{
5372 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5373 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5374 		FreeFile(fpin);
5375 		return false;
5376 	}
5377 
5378 	/*
5379 	 * Read archiver stats struct
5380 	 */
5381 	if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
5382 			  fpin) != sizeof(myArchiverStats))
5383 	{
5384 		ereport(pgStatRunningInCollector ? LOG : WARNING,
5385 				(errmsg("corrupted statistics file \"%s\"", statfile)));
5386 		FreeFile(fpin);
5387 		return false;
5388 	}
5389 
5390 	/* By default, we're going to return the timestamp of the global file. */
5391 	*ts = myGlobalStats.stats_timestamp;
5392 
5393 	/*
5394 	 * We found an existing collector stats file.  Read it and look for a
5395 	 * record for the requested database.  If found, use its timestamp.
5396 	 */
5397 	for (;;)
5398 	{
5399 		switch (fgetc(fpin))
5400 		{
5401 				/*
5402 				 * 'D'	A PgStat_StatDBEntry struct describing a database
5403 				 * follows.
5404 				 */
5405 			case 'D':
5406 				if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
5407 						  fpin) != offsetof(PgStat_StatDBEntry, tables))
5408 				{
5409 					ereport(pgStatRunningInCollector ? LOG : WARNING,
5410 							(errmsg("corrupted statistics file \"%s\"",
5411 									statfile)));
5412 					goto done;
5413 				}
5414 
5415 				/*
5416 				 * If this is the DB we're looking for, save its timestamp and
5417 				 * we're done.
5418 				 */
5419 				if (dbentry.databaseid == databaseid)
5420 				{
5421 					*ts = dbentry.stats_timestamp;
5422 					goto done;
5423 				}
5424 
5425 				break;
5426 
5427 			case 'E':
5428 				goto done;
5429 
5430 			default:
5431 				ereport(pgStatRunningInCollector ? LOG : WARNING,
5432 						(errmsg("corrupted statistics file \"%s\"",
5433 								statfile)));
5434 				goto done;
5435 		}
5436 	}
5437 
5438 done:
5439 	FreeFile(fpin);
5440 	return true;
5441 }
5442 
5443 /*
5444  * If not already done, read the statistics collector stats file into
5445  * some hash tables.  The results will be kept until pgstat_clear_snapshot()
5446  * is called (typically, at end of transaction).
5447  */
5448 static void
backend_read_statsfile(void)5449 backend_read_statsfile(void)
5450 {
5451 	TimestampTz min_ts = 0;
5452 	TimestampTz ref_ts = 0;
5453 	Oid			inquiry_db;
5454 	int			count;
5455 
5456 	/* already read it? */
5457 	if (pgStatDBHash)
5458 		return;
5459 	Assert(!pgStatRunningInCollector);
5460 
5461 	/*
5462 	 * In a normal backend, we check staleness of the data for our own DB, and
5463 	 * so we send MyDatabaseId in inquiry messages.  In the autovac launcher,
5464 	 * check staleness of the shared-catalog data, and send InvalidOid in
5465 	 * inquiry messages so as not to force writing unnecessary data.
5466 	 */
5467 	if (IsAutoVacuumLauncherProcess())
5468 		inquiry_db = InvalidOid;
5469 	else
5470 		inquiry_db = MyDatabaseId;
5471 
5472 	/*
5473 	 * Loop until fresh enough stats file is available or we ran out of time.
5474 	 * The stats inquiry message is sent repeatedly in case collector drops
5475 	 * it; but not every single time, as that just swamps the collector.
5476 	 */
5477 	for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
5478 	{
5479 		bool		ok;
5480 		TimestampTz file_ts = 0;
5481 		TimestampTz cur_ts;
5482 
5483 		CHECK_FOR_INTERRUPTS();
5484 
5485 		ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
5486 
5487 		cur_ts = GetCurrentTimestamp();
5488 		/* Calculate min acceptable timestamp, if we didn't already */
5489 		if (count == 0 || cur_ts < ref_ts)
5490 		{
5491 			/*
5492 			 * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
5493 			 * msec before now.  This indirectly ensures that the collector
5494 			 * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
5495 			 * an autovacuum worker, however, we want a lower delay to avoid
5496 			 * using stale data, so we use PGSTAT_RETRY_DELAY (since the
5497 			 * number of workers is low, this shouldn't be a problem).
5498 			 *
5499 			 * We don't recompute min_ts after sleeping, except in the
5500 			 * unlikely case that cur_ts went backwards.  So we might end up
5501 			 * accepting a file a bit older than PGSTAT_STAT_INTERVAL.  In
5502 			 * practice that shouldn't happen, though, as long as the sleep
5503 			 * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
5504 			 * tell the collector that our cutoff time is less than what we'd
5505 			 * actually accept.
5506 			 */
5507 			ref_ts = cur_ts;
5508 			if (IsAutoVacuumWorkerProcess())
5509 				min_ts = TimestampTzPlusMilliseconds(ref_ts,
5510 													 -PGSTAT_RETRY_DELAY);
5511 			else
5512 				min_ts = TimestampTzPlusMilliseconds(ref_ts,
5513 													 -PGSTAT_STAT_INTERVAL);
5514 		}
5515 
5516 		/*
5517 		 * If the file timestamp is actually newer than cur_ts, we must have
5518 		 * had a clock glitch (system time went backwards) or there is clock
5519 		 * skew between our processor and the stats collector's processor.
5520 		 * Accept the file, but send an inquiry message anyway to make
5521 		 * pgstat_recv_inquiry do a sanity check on the collector's time.
5522 		 */
5523 		if (ok && file_ts > cur_ts)
5524 		{
5525 			/*
5526 			 * A small amount of clock skew between processors isn't terribly
5527 			 * surprising, but a large difference is worth logging.  We
5528 			 * arbitrarily define "large" as 1000 msec.
5529 			 */
5530 			if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
5531 			{
5532 				char	   *filetime;
5533 				char	   *mytime;
5534 
5535 				/* Copy because timestamptz_to_str returns a static buffer */
5536 				filetime = pstrdup(timestamptz_to_str(file_ts));
5537 				mytime = pstrdup(timestamptz_to_str(cur_ts));
5538 				elog(LOG, "stats collector's time %s is later than backend local time %s",
5539 					 filetime, mytime);
5540 				pfree(filetime);
5541 				pfree(mytime);
5542 			}
5543 
5544 			pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
5545 			break;
5546 		}
5547 
5548 		/* Normal acceptance case: file is not older than cutoff time */
5549 		if (ok && file_ts >= min_ts)
5550 			break;
5551 
5552 		/* Not there or too old, so kick the collector and wait a bit */
5553 		if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
5554 			pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
5555 
5556 		pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
5557 	}
5558 
5559 	if (count >= PGSTAT_POLL_LOOP_COUNT)
5560 		ereport(LOG,
5561 				(errmsg("using stale statistics instead of current ones "
5562 						"because stats collector is not responding")));
5563 
5564 	/*
5565 	 * Autovacuum launcher wants stats about all databases, but a shallow read
5566 	 * is sufficient.  Regular backends want a deep read for just the tables
5567 	 * they can see (MyDatabaseId + shared catalogs).
5568 	 */
5569 	if (IsAutoVacuumLauncherProcess())
5570 		pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
5571 	else
5572 		pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
5573 }
5574 
5575 
5576 /* ----------
5577  * pgstat_setup_memcxt() -
5578  *
5579  *	Create pgStatLocalContext, if not already done.
5580  * ----------
5581  */
5582 static void
pgstat_setup_memcxt(void)5583 pgstat_setup_memcxt(void)
5584 {
5585 	if (!pgStatLocalContext)
5586 		pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
5587 												   "Statistics snapshot",
5588 												   ALLOCSET_SMALL_SIZES);
5589 }
5590 
5591 
5592 /* ----------
5593  * pgstat_clear_snapshot() -
5594  *
5595  *	Discard any data collected in the current transaction.  Any subsequent
5596  *	request will cause new snapshots to be read.
5597  *
5598  *	This is also invoked during transaction commit or abort to discard
5599  *	the no-longer-wanted snapshot.
5600  * ----------
5601  */
5602 void
pgstat_clear_snapshot(void)5603 pgstat_clear_snapshot(void)
5604 {
5605 	/* Release memory, if any was allocated */
5606 	if (pgStatLocalContext)
5607 		MemoryContextDelete(pgStatLocalContext);
5608 
5609 	/* Reset variables */
5610 	pgStatLocalContext = NULL;
5611 	pgStatDBHash = NULL;
5612 	localBackendStatusTable = NULL;
5613 	localNumBackends = 0;
5614 }
5615 
5616 
5617 /* ----------
5618  * pgstat_recv_inquiry() -
5619  *
5620  *	Process stat inquiry requests.
5621  * ----------
5622  */
5623 static void
pgstat_recv_inquiry(PgStat_MsgInquiry * msg,int len)5624 pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
5625 {
5626 	PgStat_StatDBEntry *dbentry;
5627 
5628 	elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
5629 
5630 	/*
5631 	 * If there's already a write request for this DB, there's nothing to do.
5632 	 *
5633 	 * Note that if a request is found, we return early and skip the below
5634 	 * check for clock skew.  This is okay, since the only way for a DB
5635 	 * request to be present in the list is that we have been here since the
5636 	 * last write round.  It seems sufficient to check for clock skew once per
5637 	 * write round.
5638 	 */
5639 	if (list_member_oid(pending_write_requests, msg->databaseid))
5640 		return;
5641 
5642 	/*
5643 	 * Check to see if we last wrote this database at a time >= the requested
5644 	 * cutoff time.  If so, this is a stale request that was generated before
5645 	 * we updated the DB file, and we don't need to do so again.
5646 	 *
5647 	 * If the requestor's local clock time is older than stats_timestamp, we
5648 	 * should suspect a clock glitch, ie system time going backwards; though
5649 	 * the more likely explanation is just delayed message receipt.  It is
5650 	 * worth expending a GetCurrentTimestamp call to be sure, since a large
5651 	 * retreat in the system clock reading could otherwise cause us to neglect
5652 	 * to update the stats file for a long time.
5653 	 */
5654 	dbentry = pgstat_get_db_entry(msg->databaseid, false);
5655 	if (dbentry == NULL)
5656 	{
5657 		/*
5658 		 * We have no data for this DB.  Enter a write request anyway so that
5659 		 * the global stats will get updated.  This is needed to prevent
5660 		 * backend_read_statsfile from waiting for data that we cannot supply,
5661 		 * in the case of a new DB that nobody has yet reported any stats for.
5662 		 * See the behavior of pgstat_read_db_statsfile_timestamp.
5663 		 */
5664 	}
5665 	else if (msg->clock_time < dbentry->stats_timestamp)
5666 	{
5667 		TimestampTz cur_ts = GetCurrentTimestamp();
5668 
5669 		if (cur_ts < dbentry->stats_timestamp)
5670 		{
5671 			/*
5672 			 * Sure enough, time went backwards.  Force a new stats file write
5673 			 * to get back in sync; but first, log a complaint.
5674 			 */
5675 			char	   *writetime;
5676 			char	   *mytime;
5677 
5678 			/* Copy because timestamptz_to_str returns a static buffer */
5679 			writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
5680 			mytime = pstrdup(timestamptz_to_str(cur_ts));
5681 			elog(LOG,
5682 				 "stats_timestamp %s is later than collector's time %s for database %u",
5683 				 writetime, mytime, dbentry->databaseid);
5684 			pfree(writetime);
5685 			pfree(mytime);
5686 		}
5687 		else
5688 		{
5689 			/*
5690 			 * Nope, it's just an old request.  Assuming msg's clock_time is
5691 			 * >= its cutoff_time, it must be stale, so we can ignore it.
5692 			 */
5693 			return;
5694 		}
5695 	}
5696 	else if (msg->cutoff_time <= dbentry->stats_timestamp)
5697 	{
5698 		/* Stale request, ignore it */
5699 		return;
5700 	}
5701 
5702 	/*
5703 	 * We need to write this DB, so create a request.
5704 	 */
5705 	pending_write_requests = lappend_oid(pending_write_requests,
5706 										 msg->databaseid);
5707 }
5708 
5709 
5710 /* ----------
5711  * pgstat_recv_tabstat() -
5712  *
5713  *	Count what the backend has done.
5714  * ----------
5715  */
5716 static void
pgstat_recv_tabstat(PgStat_MsgTabstat * msg,int len)5717 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
5718 {
5719 	PgStat_StatDBEntry *dbentry;
5720 	PgStat_StatTabEntry *tabentry;
5721 	int			i;
5722 	bool		found;
5723 
5724 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5725 
5726 	/*
5727 	 * Update database-wide stats.
5728 	 */
5729 	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
5730 	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
5731 	dbentry->n_block_read_time += msg->m_block_read_time;
5732 	dbentry->n_block_write_time += msg->m_block_write_time;
5733 
5734 	/*
5735 	 * Process all table entries in the message.
5736 	 */
5737 	for (i = 0; i < msg->m_nentries; i++)
5738 	{
5739 		PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
5740 
5741 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
5742 													   (void *) &(tabmsg->t_id),
5743 													   HASH_ENTER, &found);
5744 
5745 		if (!found)
5746 		{
5747 			/*
5748 			 * If it's a new table entry, initialize counters to the values we
5749 			 * just got.
5750 			 */
5751 			tabentry->numscans = tabmsg->t_counts.t_numscans;
5752 			tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
5753 			tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
5754 			tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
5755 			tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
5756 			tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
5757 			tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
5758 			tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
5759 			tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
5760 			tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
5761 			tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
5762 			tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
5763 
5764 			tabentry->vacuum_timestamp = 0;
5765 			tabentry->vacuum_count = 0;
5766 			tabentry->autovac_vacuum_timestamp = 0;
5767 			tabentry->autovac_vacuum_count = 0;
5768 			tabentry->analyze_timestamp = 0;
5769 			tabentry->analyze_count = 0;
5770 			tabentry->autovac_analyze_timestamp = 0;
5771 			tabentry->autovac_analyze_count = 0;
5772 		}
5773 		else
5774 		{
5775 			/*
5776 			 * Otherwise add the values to the existing entry.
5777 			 */
5778 			tabentry->numscans += tabmsg->t_counts.t_numscans;
5779 			tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
5780 			tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
5781 			tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
5782 			tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
5783 			tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
5784 			tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
5785 			/* If table was truncated, first reset the live/dead counters */
5786 			if (tabmsg->t_counts.t_truncated)
5787 			{
5788 				tabentry->n_live_tuples = 0;
5789 				tabentry->n_dead_tuples = 0;
5790 			}
5791 			tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
5792 			tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
5793 			tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
5794 			tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
5795 			tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
5796 		}
5797 
5798 		/* Clamp n_live_tuples in case of negative delta_live_tuples */
5799 		tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
5800 		/* Likewise for n_dead_tuples */
5801 		tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
5802 
5803 		/*
5804 		 * Add per-table stats to the per-database entry, too.
5805 		 */
5806 		dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
5807 		dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
5808 		dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
5809 		dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
5810 		dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
5811 		dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
5812 		dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
5813 	}
5814 }
5815 
5816 
5817 /* ----------
5818  * pgstat_recv_tabpurge() -
5819  *
5820  *	Arrange for dead table removal.
5821  * ----------
5822  */
5823 static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge * msg,int len)5824 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
5825 {
5826 	PgStat_StatDBEntry *dbentry;
5827 	int			i;
5828 
5829 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5830 
5831 	/*
5832 	 * No need to purge if we don't even know the database.
5833 	 */
5834 	if (!dbentry || !dbentry->tables)
5835 		return;
5836 
5837 	/*
5838 	 * Process all table entries in the message.
5839 	 */
5840 	for (i = 0; i < msg->m_nentries; i++)
5841 	{
5842 		/* Remove from hashtable if present; we don't care if it's not. */
5843 		(void) hash_search(dbentry->tables,
5844 						   (void *) &(msg->m_tableid[i]),
5845 						   HASH_REMOVE, NULL);
5846 	}
5847 }
5848 
5849 
5850 /* ----------
5851  * pgstat_recv_dropdb() -
5852  *
5853  *	Arrange for dead database removal
5854  * ----------
5855  */
5856 static void
pgstat_recv_dropdb(PgStat_MsgDropdb * msg,int len)5857 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
5858 {
5859 	Oid			dbid = msg->m_databaseid;
5860 	PgStat_StatDBEntry *dbentry;
5861 
5862 	/*
5863 	 * Lookup the database in the hashtable.
5864 	 */
5865 	dbentry = pgstat_get_db_entry(dbid, false);
5866 
5867 	/*
5868 	 * If found, remove it (along with the db statfile).
5869 	 */
5870 	if (dbentry)
5871 	{
5872 		char		statfile[MAXPGPATH];
5873 
5874 		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
5875 
5876 		elog(DEBUG2, "removing stats file \"%s\"", statfile);
5877 		unlink(statfile);
5878 
5879 		if (dbentry->tables != NULL)
5880 			hash_destroy(dbentry->tables);
5881 		if (dbentry->functions != NULL)
5882 			hash_destroy(dbentry->functions);
5883 
5884 		if (hash_search(pgStatDBHash,
5885 						(void *) &dbid,
5886 						HASH_REMOVE, NULL) == NULL)
5887 			ereport(ERROR,
5888 					(errmsg("database hash table corrupted during cleanup --- abort")));
5889 	}
5890 }
5891 
5892 
5893 /* ----------
5894  * pgstat_recv_resetcounter() -
5895  *
5896  *	Reset the statistics for the specified database.
5897  * ----------
5898  */
5899 static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter * msg,int len)5900 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
5901 {
5902 	PgStat_StatDBEntry *dbentry;
5903 
5904 	/*
5905 	 * Lookup the database in the hashtable.  Nothing to do if not there.
5906 	 */
5907 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5908 
5909 	if (!dbentry)
5910 		return;
5911 
5912 	/*
5913 	 * We simply throw away all the database's table entries by recreating a
5914 	 * new hash table for them.
5915 	 */
5916 	if (dbentry->tables != NULL)
5917 		hash_destroy(dbentry->tables);
5918 	if (dbentry->functions != NULL)
5919 		hash_destroy(dbentry->functions);
5920 
5921 	dbentry->tables = NULL;
5922 	dbentry->functions = NULL;
5923 
5924 	/*
5925 	 * Reset database-level stats, too.  This creates empty hash tables for
5926 	 * tables and functions.
5927 	 */
5928 	reset_dbentry_counters(dbentry);
5929 }
5930 
5931 /* ----------
5932  * pgstat_recv_resetshared() -
5933  *
5934  *	Reset some shared statistics of the cluster.
5935  * ----------
5936  */
5937 static void
pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter * msg,int len)5938 pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
5939 {
5940 	if (msg->m_resettarget == RESET_BGWRITER)
5941 	{
5942 		/* Reset the global background writer statistics for the cluster. */
5943 		memset(&globalStats, 0, sizeof(globalStats));
5944 		globalStats.stat_reset_timestamp = GetCurrentTimestamp();
5945 	}
5946 	else if (msg->m_resettarget == RESET_ARCHIVER)
5947 	{
5948 		/* Reset the archiver statistics for the cluster. */
5949 		memset(&archiverStats, 0, sizeof(archiverStats));
5950 		archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
5951 	}
5952 
5953 	/*
5954 	 * Presumably the sender of this message validated the target, don't
5955 	 * complain here if it's not valid
5956 	 */
5957 }
5958 
5959 /* ----------
5960  * pgstat_recv_resetsinglecounter() -
5961  *
5962  *	Reset a statistics for a single object
5963  * ----------
5964  */
5965 static void
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter * msg,int len)5966 pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
5967 {
5968 	PgStat_StatDBEntry *dbentry;
5969 
5970 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5971 
5972 	if (!dbentry)
5973 		return;
5974 
5975 	/* Set the reset timestamp for the whole database */
5976 	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
5977 
5978 	/* Remove object if it exists, ignore it if not */
5979 	if (msg->m_resettype == RESET_TABLE)
5980 		(void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
5981 						   HASH_REMOVE, NULL);
5982 	else if (msg->m_resettype == RESET_FUNCTION)
5983 		(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
5984 						   HASH_REMOVE, NULL);
5985 }
5986 
5987 /* ----------
5988  * pgstat_recv_autovac() -
5989  *
5990  *	Process an autovacuum signalling message.
5991  * ----------
5992  */
5993 static void
pgstat_recv_autovac(PgStat_MsgAutovacStart * msg,int len)5994 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
5995 {
5996 	PgStat_StatDBEntry *dbentry;
5997 
5998 	/*
5999 	 * Store the last autovacuum time in the database's hashtable entry.
6000 	 */
6001 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6002 
6003 	dbentry->last_autovac_time = msg->m_start_time;
6004 }
6005 
6006 /* ----------
6007  * pgstat_recv_vacuum() -
6008  *
6009  *	Process a VACUUM message.
6010  * ----------
6011  */
6012 static void
pgstat_recv_vacuum(PgStat_MsgVacuum * msg,int len)6013 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
6014 {
6015 	PgStat_StatDBEntry *dbentry;
6016 	PgStat_StatTabEntry *tabentry;
6017 
6018 	/*
6019 	 * Store the data in the table's hashtable entry.
6020 	 */
6021 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6022 
6023 	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
6024 
6025 	tabentry->n_live_tuples = msg->m_live_tuples;
6026 	tabentry->n_dead_tuples = msg->m_dead_tuples;
6027 
6028 	if (msg->m_autovacuum)
6029 	{
6030 		tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
6031 		tabentry->autovac_vacuum_count++;
6032 	}
6033 	else
6034 	{
6035 		tabentry->vacuum_timestamp = msg->m_vacuumtime;
6036 		tabentry->vacuum_count++;
6037 	}
6038 }
6039 
6040 /* ----------
6041  * pgstat_recv_analyze() -
6042  *
6043  *	Process an ANALYZE message.
6044  * ----------
6045  */
6046 static void
pgstat_recv_analyze(PgStat_MsgAnalyze * msg,int len)6047 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
6048 {
6049 	PgStat_StatDBEntry *dbentry;
6050 	PgStat_StatTabEntry *tabentry;
6051 
6052 	/*
6053 	 * Store the data in the table's hashtable entry.
6054 	 */
6055 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6056 
6057 	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
6058 
6059 	tabentry->n_live_tuples = msg->m_live_tuples;
6060 	tabentry->n_dead_tuples = msg->m_dead_tuples;
6061 
6062 	/*
6063 	 * If commanded, reset changes_since_analyze to zero.  This forgets any
6064 	 * changes that were committed while the ANALYZE was in progress, but we
6065 	 * have no good way to estimate how many of those there were.
6066 	 */
6067 	if (msg->m_resetcounter)
6068 		tabentry->changes_since_analyze = 0;
6069 
6070 	if (msg->m_autovacuum)
6071 	{
6072 		tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
6073 		tabentry->autovac_analyze_count++;
6074 	}
6075 	else
6076 	{
6077 		tabentry->analyze_timestamp = msg->m_analyzetime;
6078 		tabentry->analyze_count++;
6079 	}
6080 }
6081 
6082 
6083 /* ----------
6084  * pgstat_recv_archiver() -
6085  *
6086  *	Process a ARCHIVER message.
6087  * ----------
6088  */
6089 static void
pgstat_recv_archiver(PgStat_MsgArchiver * msg,int len)6090 pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
6091 {
6092 	if (msg->m_failed)
6093 	{
6094 		/* Failed archival attempt */
6095 		++archiverStats.failed_count;
6096 		memcpy(archiverStats.last_failed_wal, msg->m_xlog,
6097 			   sizeof(archiverStats.last_failed_wal));
6098 		archiverStats.last_failed_timestamp = msg->m_timestamp;
6099 	}
6100 	else
6101 	{
6102 		/* Successful archival operation */
6103 		++archiverStats.archived_count;
6104 		memcpy(archiverStats.last_archived_wal, msg->m_xlog,
6105 			   sizeof(archiverStats.last_archived_wal));
6106 		archiverStats.last_archived_timestamp = msg->m_timestamp;
6107 	}
6108 }
6109 
6110 /* ----------
6111  * pgstat_recv_bgwriter() -
6112  *
6113  *	Process a BGWRITER message.
6114  * ----------
6115  */
6116 static void
pgstat_recv_bgwriter(PgStat_MsgBgWriter * msg,int len)6117 pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
6118 {
6119 	globalStats.timed_checkpoints += msg->m_timed_checkpoints;
6120 	globalStats.requested_checkpoints += msg->m_requested_checkpoints;
6121 	globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
6122 	globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
6123 	globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
6124 	globalStats.buf_written_clean += msg->m_buf_written_clean;
6125 	globalStats.maxwritten_clean += msg->m_maxwritten_clean;
6126 	globalStats.buf_written_backend += msg->m_buf_written_backend;
6127 	globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
6128 	globalStats.buf_alloc += msg->m_buf_alloc;
6129 }
6130 
6131 /* ----------
6132  * pgstat_recv_recoveryconflict() -
6133  *
6134  *	Process a RECOVERYCONFLICT message.
6135  * ----------
6136  */
6137 static void
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict * msg,int len)6138 pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
6139 {
6140 	PgStat_StatDBEntry *dbentry;
6141 
6142 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6143 
6144 	switch (msg->m_reason)
6145 	{
6146 		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
6147 
6148 			/*
6149 			 * Since we drop the information about the database as soon as it
6150 			 * replicates, there is no point in counting these conflicts.
6151 			 */
6152 			break;
6153 		case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
6154 			dbentry->n_conflict_tablespace++;
6155 			break;
6156 		case PROCSIG_RECOVERY_CONFLICT_LOCK:
6157 			dbentry->n_conflict_lock++;
6158 			break;
6159 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
6160 			dbentry->n_conflict_snapshot++;
6161 			break;
6162 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
6163 			dbentry->n_conflict_bufferpin++;
6164 			break;
6165 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
6166 			dbentry->n_conflict_startup_deadlock++;
6167 			break;
6168 	}
6169 }
6170 
6171 /* ----------
6172  * pgstat_recv_deadlock() -
6173  *
6174  *	Process a DEADLOCK message.
6175  * ----------
6176  */
6177 static void
pgstat_recv_deadlock(PgStat_MsgDeadlock * msg,int len)6178 pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
6179 {
6180 	PgStat_StatDBEntry *dbentry;
6181 
6182 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6183 
6184 	dbentry->n_deadlocks++;
6185 }
6186 
6187 /* ----------
6188  * pgstat_recv_tempfile() -
6189  *
6190  *	Process a TEMPFILE message.
6191  * ----------
6192  */
6193 static void
pgstat_recv_tempfile(PgStat_MsgTempFile * msg,int len)6194 pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
6195 {
6196 	PgStat_StatDBEntry *dbentry;
6197 
6198 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6199 
6200 	dbentry->n_temp_bytes += msg->m_filesize;
6201 	dbentry->n_temp_files += 1;
6202 }
6203 
6204 /* ----------
6205  * pgstat_recv_funcstat() -
6206  *
6207  *	Count what the backend has done.
6208  * ----------
6209  */
6210 static void
pgstat_recv_funcstat(PgStat_MsgFuncstat * msg,int len)6211 pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
6212 {
6213 	PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
6214 	PgStat_StatDBEntry *dbentry;
6215 	PgStat_StatFuncEntry *funcentry;
6216 	int			i;
6217 	bool		found;
6218 
6219 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
6220 
6221 	/*
6222 	 * Process all function entries in the message.
6223 	 */
6224 	for (i = 0; i < msg->m_nentries; i++, funcmsg++)
6225 	{
6226 		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
6227 														 (void *) &(funcmsg->f_id),
6228 														 HASH_ENTER, &found);
6229 
6230 		if (!found)
6231 		{
6232 			/*
6233 			 * If it's a new function entry, initialize counters to the values
6234 			 * we just got.
6235 			 */
6236 			funcentry->f_numcalls = funcmsg->f_numcalls;
6237 			funcentry->f_total_time = funcmsg->f_total_time;
6238 			funcentry->f_self_time = funcmsg->f_self_time;
6239 		}
6240 		else
6241 		{
6242 			/*
6243 			 * Otherwise add the values to the existing entry.
6244 			 */
6245 			funcentry->f_numcalls += funcmsg->f_numcalls;
6246 			funcentry->f_total_time += funcmsg->f_total_time;
6247 			funcentry->f_self_time += funcmsg->f_self_time;
6248 		}
6249 	}
6250 }
6251 
6252 /* ----------
6253  * pgstat_recv_funcpurge() -
6254  *
6255  *	Arrange for dead function removal.
6256  * ----------
6257  */
6258 static void
pgstat_recv_funcpurge(PgStat_MsgFuncpurge * msg,int len)6259 pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
6260 {
6261 	PgStat_StatDBEntry *dbentry;
6262 	int			i;
6263 
6264 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
6265 
6266 	/*
6267 	 * No need to purge if we don't even know the database.
6268 	 */
6269 	if (!dbentry || !dbentry->functions)
6270 		return;
6271 
6272 	/*
6273 	 * Process all function entries in the message.
6274 	 */
6275 	for (i = 0; i < msg->m_nentries; i++)
6276 	{
6277 		/* Remove from hashtable if present; we don't care if it's not. */
6278 		(void) hash_search(dbentry->functions,
6279 						   (void *) &(msg->m_functionid[i]),
6280 						   HASH_REMOVE, NULL);
6281 	}
6282 }
6283 
6284 /* ----------
6285  * pgstat_write_statsfile_needed() -
6286  *
6287  *	Do we need to write out any stats files?
6288  * ----------
6289  */
6290 static bool
pgstat_write_statsfile_needed(void)6291 pgstat_write_statsfile_needed(void)
6292 {
6293 	if (pending_write_requests != NIL)
6294 		return true;
6295 
6296 	/* Everything was written recently */
6297 	return false;
6298 }
6299 
6300 /* ----------
6301  * pgstat_db_requested() -
6302  *
6303  *	Checks whether stats for a particular DB need to be written to a file.
6304  * ----------
6305  */
6306 static bool
pgstat_db_requested(Oid databaseid)6307 pgstat_db_requested(Oid databaseid)
6308 {
6309 	/*
6310 	 * If any requests are outstanding at all, we should write the stats for
6311 	 * shared catalogs (the "database" with OID 0).  This ensures that
6312 	 * backends will see up-to-date stats for shared catalogs, even though
6313 	 * they send inquiry messages mentioning only their own DB.
6314 	 */
6315 	if (databaseid == InvalidOid && pending_write_requests != NIL)
6316 		return true;
6317 
6318 	/* Search to see if there's an open request to write this database. */
6319 	if (list_member_oid(pending_write_requests, databaseid))
6320 		return true;
6321 
6322 	return false;
6323 }
6324