1 /* ----------
2  * pgstat.c
3  *
4  *	All the statistics collector stuff hacked up in one big, ugly file.
5  *
6  *	TODO:	- Separate collector, postmaster and backend stuff
7  *			  into different files.
8  *
9  *			- Add some automatic call for pgstat vacuuming.
10  *
11  *			- Add a pgstat config column to pg_database, so this
12  *			  entire thing can be enabled/disabled on a per db basis.
13  *
14  *	Copyright (c) 2001-2021, PostgreSQL Global Development Group
15  *
16  *	src/backend/postmaster/pgstat.c
17  * ----------
18  */
19 #include "postgres.h"
20 
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_SYS_SELECT_H
32 #include <sys/select.h>
33 #endif
34 
35 #include "access/heapam.h"
36 #include "access/htup_details.h"
37 #include "access/tableam.h"
38 #include "access/transam.h"
39 #include "access/twophase_rmgr.h"
40 #include "access/xact.h"
41 #include "catalog/pg_database.h"
42 #include "catalog/pg_proc.h"
43 #include "common/ip.h"
44 #include "executor/instrument.h"
45 #include "libpq/libpq.h"
46 #include "libpq/pqsignal.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "pgstat.h"
50 #include "postmaster/autovacuum.h"
51 #include "postmaster/fork_process.h"
52 #include "postmaster/interrupt.h"
53 #include "postmaster/postmaster.h"
54 #include "replication/slot.h"
55 #include "replication/walsender.h"
56 #include "storage/backendid.h"
57 #include "storage/dsm.h"
58 #include "storage/fd.h"
59 #include "storage/ipc.h"
60 #include "storage/latch.h"
61 #include "storage/lmgr.h"
62 #include "storage/pg_shmem.h"
63 #include "storage/proc.h"
64 #include "storage/procsignal.h"
65 #include "utils/builtins.h"
66 #include "utils/guc.h"
67 #include "utils/memutils.h"
68 #include "utils/ps_status.h"
69 #include "utils/rel.h"
70 #include "utils/snapmgr.h"
71 #include "utils/timestamp.h"
72 
73 /* ----------
74  * Timer definitions.
75  * ----------
76  */
77 #define PGSTAT_STAT_INTERVAL	500 /* Minimum time between stats file
78 									 * updates; in milliseconds. */
79 
80 #define PGSTAT_RETRY_DELAY		10	/* How long to wait between checks for a
81 									 * new file; in milliseconds. */
82 
83 #define PGSTAT_MAX_WAIT_TIME	10000	/* Maximum time to wait for a stats
84 										 * file update; in milliseconds. */
85 
86 #define PGSTAT_INQ_INTERVAL		640 /* How often to ping the collector for a
87 									 * new file; in milliseconds. */
88 
89 #define PGSTAT_RESTART_INTERVAL 60	/* How often to attempt to restart a
90 									 * failed statistics collector; in
91 									 * seconds. */
92 
93 #define PGSTAT_POLL_LOOP_COUNT	(PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
94 #define PGSTAT_INQ_LOOP_COUNT	(PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
95 
96 /* Minimum receive buffer size for the collector's socket. */
97 #define PGSTAT_MIN_RCVBUF		(100 * 1024)
98 
99 
100 /* ----------
101  * The initial size hints for the hash tables used in the collector.
102  * ----------
103  */
104 #define PGSTAT_DB_HASH_SIZE		16
105 #define PGSTAT_TAB_HASH_SIZE	512
106 #define PGSTAT_FUNCTION_HASH_SIZE	512
107 #define PGSTAT_REPLSLOT_HASH_SIZE	32
108 
109 
110 /* ----------
111  * GUC parameters
112  * ----------
113  */
114 bool		pgstat_track_counts = false;
115 int			pgstat_track_functions = TRACK_FUNC_OFF;
116 
117 /* ----------
118  * Built from GUC parameter
119  * ----------
120  */
121 char	   *pgstat_stat_directory = NULL;
122 char	   *pgstat_stat_filename = NULL;
123 char	   *pgstat_stat_tmpname = NULL;
124 
125 /*
126  * BgWriter and WAL global statistics counters.
127  * Stored directly in a stats message structure so they can be sent
128  * without needing to copy things around.  We assume these init to zeroes.
129  */
130 PgStat_MsgBgWriter BgWriterStats;
131 PgStat_MsgWal WalStats;
132 
133 /*
134  * WAL usage counters saved from pgWALUsage at the previous call to
135  * pgstat_send_wal(). This is used to calculate how much WAL usage
136  * happens between pgstat_send_wal() calls, by substracting
137  * the previous counters from the current ones.
138  */
139 static WalUsage prevWalUsage;
140 
141 /*
142  * List of SLRU names that we keep stats for.  There is no central registry of
143  * SLRUs, so we use this fixed list instead.  The "other" entry is used for
144  * all SLRUs without an explicit entry (e.g. SLRUs in extensions).
145  */
146 static const char *const slru_names[] = {
147 	"CommitTs",
148 	"MultiXactMember",
149 	"MultiXactOffset",
150 	"Notify",
151 	"Serial",
152 	"Subtrans",
153 	"Xact",
154 	"other"						/* has to be last */
155 };
156 
157 #define SLRU_NUM_ELEMENTS	lengthof(slru_names)
158 
159 /*
160  * SLRU statistics counts waiting to be sent to the collector.  These are
161  * stored directly in stats message format so they can be sent without needing
162  * to copy things around.  We assume this variable inits to zeroes.  Entries
163  * are one-to-one with slru_names[].
164  */
165 static PgStat_MsgSLRU SLRUStats[SLRU_NUM_ELEMENTS];
166 
167 /* ----------
168  * Local data
169  * ----------
170  */
171 NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET;
172 
173 static struct sockaddr_storage pgStatAddr;
174 
175 static time_t last_pgstat_start_time;
176 
177 static bool pgStatRunningInCollector = false;
178 
179 /*
180  * Structures in which backends store per-table info that's waiting to be
181  * sent to the collector.
182  *
183  * NOTE: once allocated, TabStatusArray structures are never moved or deleted
184  * for the life of the backend.  Also, we zero out the t_id fields of the
185  * contained PgStat_TableStatus structs whenever they are not actively in use.
186  * This allows relcache pgstat_info pointers to be treated as long-lived data,
187  * avoiding repeated searches in pgstat_initstats() when a relation is
188  * repeatedly opened during a transaction.
189  */
190 #define TABSTAT_QUANTUM		100 /* we alloc this many at a time */
191 
192 typedef struct TabStatusArray
193 {
194 	struct TabStatusArray *tsa_next;	/* link to next array, if any */
195 	int			tsa_used;		/* # entries currently used */
196 	PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM];	/* per-table data */
197 } TabStatusArray;
198 
199 static TabStatusArray *pgStatTabList = NULL;
200 
201 /*
202  * pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
203  */
204 typedef struct TabStatHashEntry
205 {
206 	Oid			t_id;
207 	PgStat_TableStatus *tsa_entry;
208 } TabStatHashEntry;
209 
210 /*
211  * Hash table for O(1) t_id -> tsa_entry lookup
212  */
213 static HTAB *pgStatTabHash = NULL;
214 
215 /*
216  * Backends store per-function info that's waiting to be sent to the collector
217  * in this hash table (indexed by function OID).
218  */
219 static HTAB *pgStatFunctions = NULL;
220 
221 /*
222  * Indicates if backend has some function stats that it hasn't yet
223  * sent to the collector.
224  */
225 static bool have_function_stats = false;
226 
227 /*
228  * Tuple insertion/deletion counts for an open transaction can't be propagated
229  * into PgStat_TableStatus counters until we know if it is going to commit
230  * or abort.  Hence, we keep these counts in per-subxact structs that live
231  * in TopTransactionContext.  This data structure is designed on the assumption
232  * that subxacts won't usually modify very many tables.
233  */
234 typedef struct PgStat_SubXactStatus
235 {
236 	int			nest_level;		/* subtransaction nest level */
237 	struct PgStat_SubXactStatus *prev;	/* higher-level subxact if any */
238 	PgStat_TableXactStatus *first;	/* head of list for this subxact */
239 } PgStat_SubXactStatus;
240 
241 static PgStat_SubXactStatus *pgStatXactStack = NULL;
242 
243 static int	pgStatXactCommit = 0;
244 static int	pgStatXactRollback = 0;
245 PgStat_Counter pgStatBlockReadTime = 0;
246 PgStat_Counter pgStatBlockWriteTime = 0;
247 static PgStat_Counter pgLastSessionReportTime = 0;
248 PgStat_Counter pgStatActiveTime = 0;
249 PgStat_Counter pgStatTransactionIdleTime = 0;
250 SessionEndType pgStatSessionEndCause = DISCONNECT_NORMAL;
251 
252 /* Record that's written to 2PC state file when pgstat state is persisted */
253 typedef struct TwoPhasePgStatRecord
254 {
255 	PgStat_Counter tuples_inserted; /* tuples inserted in xact */
256 	PgStat_Counter tuples_updated;	/* tuples updated in xact */
257 	PgStat_Counter tuples_deleted;	/* tuples deleted in xact */
258 	PgStat_Counter inserted_pre_trunc;	/* tuples inserted prior to truncate */
259 	PgStat_Counter updated_pre_trunc;	/* tuples updated prior to truncate */
260 	PgStat_Counter deleted_pre_trunc;	/* tuples deleted prior to truncate */
261 	Oid			t_id;			/* table's OID */
262 	bool		t_shared;		/* is it a shared catalog? */
263 	bool		t_truncated;	/* was the relation truncated? */
264 } TwoPhasePgStatRecord;
265 
266 /*
267  * Info about current "snapshot" of stats file
268  */
269 static MemoryContext pgStatLocalContext = NULL;
270 static HTAB *pgStatDBHash = NULL;
271 
272 /*
273  * Cluster wide statistics, kept in the stats collector.
274  * Contains statistics that are not collected per database
275  * or per table.
276  */
277 static PgStat_ArchiverStats archiverStats;
278 static PgStat_GlobalStats globalStats;
279 static PgStat_WalStats walStats;
280 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
281 static HTAB *replSlotStatHash = NULL;
282 
283 /*
284  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
285  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
286  * will write both that DB's data and the shared stats.
287  */
288 static List *pending_write_requests = NIL;
289 
290 /*
291  * Total time charged to functions so far in the current backend.
292  * We use this to help separate "self" and "other" time charges.
293  * (We assume this initializes to zero.)
294  */
295 static instr_time total_func_time;
296 
297 
298 /* ----------
299  * Local function forward declarations
300  * ----------
301  */
302 #ifdef EXEC_BACKEND
303 static pid_t pgstat_forkexec(void);
304 #endif
305 
306 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
307 
308 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
309 static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
310 												 Oid tableoid, bool create);
311 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
312 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
313 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
314 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
315 static void backend_read_statsfile(void);
316 
317 static bool pgstat_write_statsfile_needed(void);
318 static bool pgstat_db_requested(Oid databaseid);
319 
320 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
321 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
322 
323 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
324 static void pgstat_send_funcstats(void);
325 static void pgstat_send_slru(void);
326 static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
327 static bool pgstat_should_report_connstat(void);
328 static void pgstat_report_disconnect(Oid dboid);
329 
330 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
331 
332 static void pgstat_setup_memcxt(void);
333 
334 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
335 static void pgstat_send(void *msg, int len);
336 
337 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
338 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
339 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
340 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
341 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
342 static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
343 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
344 static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
345 static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
346 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
347 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
348 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
349 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
350 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
351 static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
352 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
353 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
354 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
355 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
356 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
357 static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
358 static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
359 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
360 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
361 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
362 
363 /* ------------------------------------------------------------
364  * Public functions called from postmaster follow
365  * ------------------------------------------------------------
366  */
367 
368 /* ----------
369  * pgstat_init() -
370  *
371  *	Called from postmaster at startup. Create the resources required
372  *	by the statistics collector process.  If unable to do so, do not
373  *	fail --- better to let the postmaster start with stats collection
374  *	disabled.
375  * ----------
376  */
377 void
pgstat_init(void)378 pgstat_init(void)
379 {
380 	ACCEPT_TYPE_ARG3 alen;
381 	struct addrinfo *addrs = NULL,
382 			   *addr,
383 				hints;
384 	int			ret;
385 	fd_set		rset;
386 	struct timeval tv;
387 	char		test_byte;
388 	int			sel_res;
389 	int			tries = 0;
390 
391 #define TESTBYTEVAL ((char) 199)
392 
393 	/*
394 	 * This static assertion verifies that we didn't mess up the calculations
395 	 * involved in selecting maximum payload sizes for our UDP messages.
396 	 * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
397 	 * be silent performance loss from fragmentation, it seems worth having a
398 	 * compile-time cross-check that we didn't.
399 	 */
400 	StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
401 					 "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
402 
403 	/*
404 	 * Create the UDP socket for sending and receiving statistic messages
405 	 */
406 	hints.ai_flags = AI_PASSIVE;
407 	hints.ai_family = AF_UNSPEC;
408 	hints.ai_socktype = SOCK_DGRAM;
409 	hints.ai_protocol = 0;
410 	hints.ai_addrlen = 0;
411 	hints.ai_addr = NULL;
412 	hints.ai_canonname = NULL;
413 	hints.ai_next = NULL;
414 	ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
415 	if (ret || !addrs)
416 	{
417 		ereport(LOG,
418 				(errmsg("could not resolve \"localhost\": %s",
419 						gai_strerror(ret))));
420 		goto startup_failed;
421 	}
422 
423 	/*
424 	 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
425 	 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
426 	 * when kernel will reject IPv6).  Worse, the failure may occur at the
427 	 * bind() or perhaps even connect() stage.  So we must loop through the
428 	 * results till we find a working combination. We will generate LOG
429 	 * messages, but no error, for bogus combinations.
430 	 */
431 	for (addr = addrs; addr; addr = addr->ai_next)
432 	{
433 #ifdef HAVE_UNIX_SOCKETS
434 		/* Ignore AF_UNIX sockets, if any are returned. */
435 		if (addr->ai_family == AF_UNIX)
436 			continue;
437 #endif
438 
439 		if (++tries > 1)
440 			ereport(LOG,
441 					(errmsg("trying another address for the statistics collector")));
442 
443 		/*
444 		 * Create the socket.
445 		 */
446 		if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
447 		{
448 			ereport(LOG,
449 					(errcode_for_socket_access(),
450 					 errmsg("could not create socket for statistics collector: %m")));
451 			continue;
452 		}
453 
454 		/*
455 		 * Bind it to a kernel assigned port on localhost and get the assigned
456 		 * port via getsockname().
457 		 */
458 		if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
459 		{
460 			ereport(LOG,
461 					(errcode_for_socket_access(),
462 					 errmsg("could not bind socket for statistics collector: %m")));
463 			closesocket(pgStatSock);
464 			pgStatSock = PGINVALID_SOCKET;
465 			continue;
466 		}
467 
468 		alen = sizeof(pgStatAddr);
469 		if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
470 		{
471 			ereport(LOG,
472 					(errcode_for_socket_access(),
473 					 errmsg("could not get address of socket for statistics collector: %m")));
474 			closesocket(pgStatSock);
475 			pgStatSock = PGINVALID_SOCKET;
476 			continue;
477 		}
478 
479 		/*
480 		 * Connect the socket to its own address.  This saves a few cycles by
481 		 * not having to respecify the target address on every send. This also
482 		 * provides a kernel-level check that only packets from this same
483 		 * address will be received.
484 		 */
485 		if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
486 		{
487 			ereport(LOG,
488 					(errcode_for_socket_access(),
489 					 errmsg("could not connect socket for statistics collector: %m")));
490 			closesocket(pgStatSock);
491 			pgStatSock = PGINVALID_SOCKET;
492 			continue;
493 		}
494 
495 		/*
496 		 * Try to send and receive a one-byte test message on the socket. This
497 		 * is to catch situations where the socket can be created but will not
498 		 * actually pass data (for instance, because kernel packet filtering
499 		 * rules prevent it).
500 		 */
501 		test_byte = TESTBYTEVAL;
502 
503 retry1:
504 		if (send(pgStatSock, &test_byte, 1, 0) != 1)
505 		{
506 			if (errno == EINTR)
507 				goto retry1;	/* if interrupted, just retry */
508 			ereport(LOG,
509 					(errcode_for_socket_access(),
510 					 errmsg("could not send test message on socket for statistics collector: %m")));
511 			closesocket(pgStatSock);
512 			pgStatSock = PGINVALID_SOCKET;
513 			continue;
514 		}
515 
516 		/*
517 		 * There could possibly be a little delay before the message can be
518 		 * received.  We arbitrarily allow up to half a second before deciding
519 		 * it's broken.
520 		 */
521 		for (;;)				/* need a loop to handle EINTR */
522 		{
523 			FD_ZERO(&rset);
524 			FD_SET(pgStatSock, &rset);
525 
526 			tv.tv_sec = 0;
527 			tv.tv_usec = 500000;
528 			sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
529 			if (sel_res >= 0 || errno != EINTR)
530 				break;
531 		}
532 		if (sel_res < 0)
533 		{
534 			ereport(LOG,
535 					(errcode_for_socket_access(),
536 					 errmsg("select() failed in statistics collector: %m")));
537 			closesocket(pgStatSock);
538 			pgStatSock = PGINVALID_SOCKET;
539 			continue;
540 		}
541 		if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
542 		{
543 			/*
544 			 * This is the case we actually think is likely, so take pains to
545 			 * give a specific message for it.
546 			 *
547 			 * errno will not be set meaningfully here, so don't use it.
548 			 */
549 			ereport(LOG,
550 					(errcode(ERRCODE_CONNECTION_FAILURE),
551 					 errmsg("test message did not get through on socket for statistics collector")));
552 			closesocket(pgStatSock);
553 			pgStatSock = PGINVALID_SOCKET;
554 			continue;
555 		}
556 
557 		test_byte++;			/* just make sure variable is changed */
558 
559 retry2:
560 		if (recv(pgStatSock, &test_byte, 1, 0) != 1)
561 		{
562 			if (errno == EINTR)
563 				goto retry2;	/* if interrupted, just retry */
564 			ereport(LOG,
565 					(errcode_for_socket_access(),
566 					 errmsg("could not receive test message on socket for statistics collector: %m")));
567 			closesocket(pgStatSock);
568 			pgStatSock = PGINVALID_SOCKET;
569 			continue;
570 		}
571 
572 		if (test_byte != TESTBYTEVAL)	/* strictly paranoia ... */
573 		{
574 			ereport(LOG,
575 					(errcode(ERRCODE_INTERNAL_ERROR),
576 					 errmsg("incorrect test message transmission on socket for statistics collector")));
577 			closesocket(pgStatSock);
578 			pgStatSock = PGINVALID_SOCKET;
579 			continue;
580 		}
581 
582 		/* If we get here, we have a working socket */
583 		break;
584 	}
585 
586 	/* Did we find a working address? */
587 	if (!addr || pgStatSock == PGINVALID_SOCKET)
588 		goto startup_failed;
589 
590 	/*
591 	 * Set the socket to non-blocking IO.  This ensures that if the collector
592 	 * falls behind, statistics messages will be discarded; backends won't
593 	 * block waiting to send messages to the collector.
594 	 */
595 	if (!pg_set_noblock(pgStatSock))
596 	{
597 		ereport(LOG,
598 				(errcode_for_socket_access(),
599 				 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
600 		goto startup_failed;
601 	}
602 
603 	/*
604 	 * Try to ensure that the socket's receive buffer is at least
605 	 * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
606 	 * data.  Use of UDP protocol means that we are willing to lose data under
607 	 * heavy load, but we don't want it to happen just because of ridiculously
608 	 * small default buffer sizes (such as 8KB on older Windows versions).
609 	 */
610 	{
611 		int			old_rcvbuf;
612 		int			new_rcvbuf;
613 		ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
614 
615 		if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
616 					   (char *) &old_rcvbuf, &rcvbufsize) < 0)
617 		{
618 			ereport(LOG,
619 					(errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF")));
620 			/* if we can't get existing size, always try to set it */
621 			old_rcvbuf = 0;
622 		}
623 
624 		new_rcvbuf = PGSTAT_MIN_RCVBUF;
625 		if (old_rcvbuf < new_rcvbuf)
626 		{
627 			if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
628 						   (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
629 				ereport(LOG,
630 						(errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF")));
631 		}
632 	}
633 
634 	pg_freeaddrinfo_all(hints.ai_family, addrs);
635 
636 	/* Now that we have a long-lived socket, tell fd.c about it. */
637 	ReserveExternalFD();
638 
639 	return;
640 
641 startup_failed:
642 	ereport(LOG,
643 			(errmsg("disabling statistics collector for lack of working socket")));
644 
645 	if (addrs)
646 		pg_freeaddrinfo_all(hints.ai_family, addrs);
647 
648 	if (pgStatSock != PGINVALID_SOCKET)
649 		closesocket(pgStatSock);
650 	pgStatSock = PGINVALID_SOCKET;
651 
652 	/*
653 	 * Adjust GUC variables to suppress useless activity, and for debugging
654 	 * purposes (seeing track_counts off is a clue that we failed here). We
655 	 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
656 	 * on from postgresql.conf without a restart.
657 	 */
658 	SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
659 }
660 
661 /*
662  * subroutine for pgstat_reset_all
663  */
664 static void
pgstat_reset_remove_files(const char * directory)665 pgstat_reset_remove_files(const char *directory)
666 {
667 	DIR		   *dir;
668 	struct dirent *entry;
669 	char		fname[MAXPGPATH * 2];
670 
671 	dir = AllocateDir(directory);
672 	while ((entry = ReadDir(dir, directory)) != NULL)
673 	{
674 		int			nchars;
675 		Oid			tmp_oid;
676 
677 		/*
678 		 * Skip directory entries that don't match the file names we write.
679 		 * See get_dbstat_filename for the database-specific pattern.
680 		 */
681 		if (strncmp(entry->d_name, "global.", 7) == 0)
682 			nchars = 7;
683 		else
684 		{
685 			nchars = 0;
686 			(void) sscanf(entry->d_name, "db_%u.%n",
687 						  &tmp_oid, &nchars);
688 			if (nchars <= 0)
689 				continue;
690 			/* %u allows leading whitespace, so reject that */
691 			if (strchr("0123456789", entry->d_name[3]) == NULL)
692 				continue;
693 		}
694 
695 		if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
696 			strcmp(entry->d_name + nchars, "stat") != 0)
697 			continue;
698 
699 		snprintf(fname, sizeof(fname), "%s/%s", directory,
700 				 entry->d_name);
701 		unlink(fname);
702 	}
703 	FreeDir(dir);
704 }
705 
706 /*
707  * pgstat_reset_all() -
708  *
709  * Remove the stats files.  This is currently used only if WAL
710  * recovery is needed after a crash.
711  */
712 void
pgstat_reset_all(void)713 pgstat_reset_all(void)
714 {
715 	pgstat_reset_remove_files(pgstat_stat_directory);
716 	pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
717 }
718 
719 #ifdef EXEC_BACKEND
720 
721 /*
722  * pgstat_forkexec() -
723  *
724  * Format up the arglist for, then fork and exec, statistics collector process
725  */
726 static pid_t
pgstat_forkexec(void)727 pgstat_forkexec(void)
728 {
729 	char	   *av[10];
730 	int			ac = 0;
731 
732 	av[ac++] = "postgres";
733 	av[ac++] = "--forkcol";
734 	av[ac++] = NULL;			/* filled in by postmaster_forkexec */
735 
736 	av[ac] = NULL;
737 	Assert(ac < lengthof(av));
738 
739 	return postmaster_forkexec(ac, av);
740 }
741 #endif							/* EXEC_BACKEND */
742 
743 
744 /*
745  * pgstat_start() -
746  *
747  *	Called from postmaster at startup or after an existing collector
748  *	died.  Attempt to fire up a fresh statistics collector.
749  *
750  *	Returns PID of child process, or 0 if fail.
751  *
752  *	Note: if fail, we will be called again from the postmaster main loop.
753  */
754 int
pgstat_start(void)755 pgstat_start(void)
756 {
757 	time_t		curtime;
758 	pid_t		pgStatPid;
759 
760 	/*
761 	 * Check that the socket is there, else pgstat_init failed and we can do
762 	 * nothing useful.
763 	 */
764 	if (pgStatSock == PGINVALID_SOCKET)
765 		return 0;
766 
767 	/*
768 	 * Do nothing if too soon since last collector start.  This is a safety
769 	 * valve to protect against continuous respawn attempts if the collector
770 	 * is dying immediately at launch.  Note that since we will be re-called
771 	 * from the postmaster main loop, we will get another chance later.
772 	 */
773 	curtime = time(NULL);
774 	if ((unsigned int) (curtime - last_pgstat_start_time) <
775 		(unsigned int) PGSTAT_RESTART_INTERVAL)
776 		return 0;
777 	last_pgstat_start_time = curtime;
778 
779 	/*
780 	 * Okay, fork off the collector.
781 	 */
782 #ifdef EXEC_BACKEND
783 	switch ((pgStatPid = pgstat_forkexec()))
784 #else
785 	switch ((pgStatPid = fork_process()))
786 #endif
787 	{
788 		case -1:
789 			ereport(LOG,
790 					(errmsg("could not fork statistics collector: %m")));
791 			return 0;
792 
793 #ifndef EXEC_BACKEND
794 		case 0:
795 			/* in postmaster child ... */
796 			InitPostmasterChild();
797 
798 			/* Close the postmaster's sockets */
799 			ClosePostmasterPorts(false);
800 
801 			/* Drop our connection to postmaster's shared memory, as well */
802 			dsm_detach_all();
803 			PGSharedMemoryDetach();
804 
805 			PgstatCollectorMain(0, NULL);
806 			break;
807 #endif
808 
809 		default:
810 			return (int) pgStatPid;
811 	}
812 
813 	/* shouldn't get here */
814 	return 0;
815 }
816 
817 void
allow_immediate_pgstat_restart(void)818 allow_immediate_pgstat_restart(void)
819 {
820 	last_pgstat_start_time = 0;
821 }
822 
823 /* ------------------------------------------------------------
824  * Public functions used by backends follow
825  *------------------------------------------------------------
826  */
827 
828 
829 /* ----------
830  * pgstat_report_stat() -
831  *
832  *	Must be called by processes that performs DML: tcop/postgres.c, logical
833  *	receiver processes, SPI worker, etc. to send the so far collected
834  *	per-table and function usage statistics to the collector.  Note that this
835  *	is called only when not within a transaction, so it is fair to use
836  *	transaction stop time as an approximation of current time.
837  *
838  *	"disconnect" is "true" only for the last call before the backend
839  *	exits.  This makes sure that no data is lost and that interrupted
840  *	sessions are reported correctly.
841  * ----------
842  */
843 void
pgstat_report_stat(bool disconnect)844 pgstat_report_stat(bool disconnect)
845 {
846 	/* we assume this inits to all zeroes: */
847 	static const PgStat_TableCounts all_zeroes;
848 	static TimestampTz last_report = 0;
849 
850 	TimestampTz now;
851 	PgStat_MsgTabstat regular_msg;
852 	PgStat_MsgTabstat shared_msg;
853 	TabStatusArray *tsa;
854 	int			i;
855 
856 	/*
857 	 * Don't expend a clock check if nothing to do.
858 	 *
859 	 * To determine whether any WAL activity has occurred since last time, not
860 	 * only the number of generated WAL records but also the numbers of WAL
861 	 * writes and syncs need to be checked. Because even transaction that
862 	 * generates no WAL records can write or sync WAL data when flushing the
863 	 * data pages.
864 	 */
865 	if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
866 		pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
867 		pgWalUsage.wal_records == prevWalUsage.wal_records &&
868 		WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0 &&
869 		!have_function_stats && !disconnect)
870 		return;
871 
872 	/*
873 	 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
874 	 * msec since we last sent one, or the backend is about to exit.
875 	 */
876 	now = GetCurrentTransactionStopTimestamp();
877 	if (!disconnect &&
878 		!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
879 		return;
880 
881 	last_report = now;
882 
883 	if (disconnect)
884 		pgstat_report_disconnect(MyDatabaseId);
885 
886 	/*
887 	 * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
888 	 * entries it points to.  (Should we fail partway through the loop below,
889 	 * it's okay to have removed the hashtable already --- the only
890 	 * consequence is we'd get multiple entries for the same table in the
891 	 * pgStatTabList, and that's safe.)
892 	 */
893 	if (pgStatTabHash)
894 		hash_destroy(pgStatTabHash);
895 	pgStatTabHash = NULL;
896 
897 	/*
898 	 * Scan through the TabStatusArray struct(s) to find tables that actually
899 	 * have counts, and build messages to send.  We have to separate shared
900 	 * relations from regular ones because the databaseid field in the message
901 	 * header has to depend on that.
902 	 */
903 	regular_msg.m_databaseid = MyDatabaseId;
904 	shared_msg.m_databaseid = InvalidOid;
905 	regular_msg.m_nentries = 0;
906 	shared_msg.m_nentries = 0;
907 
908 	for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
909 	{
910 		for (i = 0; i < tsa->tsa_used; i++)
911 		{
912 			PgStat_TableStatus *entry = &tsa->tsa_entries[i];
913 			PgStat_MsgTabstat *this_msg;
914 			PgStat_TableEntry *this_ent;
915 
916 			/* Shouldn't have any pending transaction-dependent counts */
917 			Assert(entry->trans == NULL);
918 
919 			/*
920 			 * Ignore entries that didn't accumulate any actual counts, such
921 			 * as indexes that were opened by the planner but not used.
922 			 */
923 			if (memcmp(&entry->t_counts, &all_zeroes,
924 					   sizeof(PgStat_TableCounts)) == 0)
925 				continue;
926 
927 			/*
928 			 * OK, insert data into the appropriate message, and send if full.
929 			 */
930 			this_msg = entry->t_shared ? &shared_msg : &regular_msg;
931 			this_ent = &this_msg->m_entry[this_msg->m_nentries];
932 			this_ent->t_id = entry->t_id;
933 			memcpy(&this_ent->t_counts, &entry->t_counts,
934 				   sizeof(PgStat_TableCounts));
935 			if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
936 			{
937 				pgstat_send_tabstat(this_msg, now);
938 				this_msg->m_nentries = 0;
939 			}
940 		}
941 		/* zero out PgStat_TableStatus structs after use */
942 		MemSet(tsa->tsa_entries, 0,
943 			   tsa->tsa_used * sizeof(PgStat_TableStatus));
944 		tsa->tsa_used = 0;
945 	}
946 
947 	/*
948 	 * Send partial messages.  Make sure that any pending xact commit/abort
949 	 * and connection stats get counted, even if there are no table stats to
950 	 * send.
951 	 */
952 	if (regular_msg.m_nentries > 0 ||
953 		pgStatXactCommit > 0 || pgStatXactRollback > 0 || disconnect)
954 		pgstat_send_tabstat(&regular_msg, now);
955 	if (shared_msg.m_nentries > 0)
956 		pgstat_send_tabstat(&shared_msg, now);
957 
958 	/* Now, send function statistics */
959 	pgstat_send_funcstats();
960 
961 	/* Send WAL statistics */
962 	pgstat_send_wal(true);
963 
964 	/* Finally send SLRU statistics */
965 	pgstat_send_slru();
966 }
967 
968 /*
969  * Subroutine for pgstat_report_stat: finish and send a tabstat message
970  */
971 static void
pgstat_send_tabstat(PgStat_MsgTabstat * tsmsg,TimestampTz now)972 pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now)
973 {
974 	int			n;
975 	int			len;
976 
977 	/* It's unlikely we'd get here with no socket, but maybe not impossible */
978 	if (pgStatSock == PGINVALID_SOCKET)
979 		return;
980 
981 	/*
982 	 * Report and reset accumulated xact commit/rollback and I/O timings
983 	 * whenever we send a normal tabstat message
984 	 */
985 	if (OidIsValid(tsmsg->m_databaseid))
986 	{
987 		tsmsg->m_xact_commit = pgStatXactCommit;
988 		tsmsg->m_xact_rollback = pgStatXactRollback;
989 		tsmsg->m_block_read_time = pgStatBlockReadTime;
990 		tsmsg->m_block_write_time = pgStatBlockWriteTime;
991 
992 		if (pgstat_should_report_connstat())
993 		{
994 			long		secs;
995 			int			usecs;
996 
997 			/*
998 			 * pgLastSessionReportTime is initialized to MyStartTimestamp by
999 			 * pgstat_report_connect().
1000 			 */
1001 			TimestampDifference(pgLastSessionReportTime, now, &secs, &usecs);
1002 			pgLastSessionReportTime = now;
1003 			tsmsg->m_session_time = (PgStat_Counter) secs * 1000000 + usecs;
1004 			tsmsg->m_active_time = pgStatActiveTime;
1005 			tsmsg->m_idle_in_xact_time = pgStatTransactionIdleTime;
1006 		}
1007 		else
1008 		{
1009 			tsmsg->m_session_time = 0;
1010 			tsmsg->m_active_time = 0;
1011 			tsmsg->m_idle_in_xact_time = 0;
1012 		}
1013 		pgStatXactCommit = 0;
1014 		pgStatXactRollback = 0;
1015 		pgStatBlockReadTime = 0;
1016 		pgStatBlockWriteTime = 0;
1017 		pgStatActiveTime = 0;
1018 		pgStatTransactionIdleTime = 0;
1019 	}
1020 	else
1021 	{
1022 		tsmsg->m_xact_commit = 0;
1023 		tsmsg->m_xact_rollback = 0;
1024 		tsmsg->m_block_read_time = 0;
1025 		tsmsg->m_block_write_time = 0;
1026 		tsmsg->m_session_time = 0;
1027 		tsmsg->m_active_time = 0;
1028 		tsmsg->m_idle_in_xact_time = 0;
1029 	}
1030 
1031 	n = tsmsg->m_nentries;
1032 	len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
1033 		n * sizeof(PgStat_TableEntry);
1034 
1035 	pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
1036 	pgstat_send(tsmsg, len);
1037 }
1038 
1039 /*
1040  * Subroutine for pgstat_report_stat: populate and send a function stat message
1041  */
1042 static void
pgstat_send_funcstats(void)1043 pgstat_send_funcstats(void)
1044 {
1045 	/* we assume this inits to all zeroes: */
1046 	static const PgStat_FunctionCounts all_zeroes;
1047 
1048 	PgStat_MsgFuncstat msg;
1049 	PgStat_BackendFunctionEntry *entry;
1050 	HASH_SEQ_STATUS fstat;
1051 
1052 	if (pgStatFunctions == NULL)
1053 		return;
1054 
1055 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
1056 	msg.m_databaseid = MyDatabaseId;
1057 	msg.m_nentries = 0;
1058 
1059 	hash_seq_init(&fstat, pgStatFunctions);
1060 	while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
1061 	{
1062 		PgStat_FunctionEntry *m_ent;
1063 
1064 		/* Skip it if no counts accumulated since last time */
1065 		if (memcmp(&entry->f_counts, &all_zeroes,
1066 				   sizeof(PgStat_FunctionCounts)) == 0)
1067 			continue;
1068 
1069 		/* need to convert format of time accumulators */
1070 		m_ent = &msg.m_entry[msg.m_nentries];
1071 		m_ent->f_id = entry->f_id;
1072 		m_ent->f_numcalls = entry->f_counts.f_numcalls;
1073 		m_ent->f_total_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_total_time);
1074 		m_ent->f_self_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_self_time);
1075 
1076 		if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
1077 		{
1078 			pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1079 						msg.m_nentries * sizeof(PgStat_FunctionEntry));
1080 			msg.m_nentries = 0;
1081 		}
1082 
1083 		/* reset the entry's counts */
1084 		MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
1085 	}
1086 
1087 	if (msg.m_nentries > 0)
1088 		pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1089 					msg.m_nentries * sizeof(PgStat_FunctionEntry));
1090 
1091 	have_function_stats = false;
1092 }
1093 
1094 
1095 /* ----------
1096  * pgstat_vacuum_stat() -
1097  *
1098  *	Will tell the collector about objects he can get rid of.
1099  * ----------
1100  */
1101 void
pgstat_vacuum_stat(void)1102 pgstat_vacuum_stat(void)
1103 {
1104 	HTAB	   *htab;
1105 	PgStat_MsgTabpurge msg;
1106 	PgStat_MsgFuncpurge f_msg;
1107 	HASH_SEQ_STATUS hstat;
1108 	PgStat_StatDBEntry *dbentry;
1109 	PgStat_StatTabEntry *tabentry;
1110 	PgStat_StatFuncEntry *funcentry;
1111 	int			len;
1112 
1113 	if (pgStatSock == PGINVALID_SOCKET)
1114 		return;
1115 
1116 	/*
1117 	 * If not done for this transaction, read the statistics collector stats
1118 	 * file into some hash tables.
1119 	 */
1120 	backend_read_statsfile();
1121 
1122 	/*
1123 	 * Read pg_database and make a list of OIDs of all existing databases
1124 	 */
1125 	htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid);
1126 
1127 	/*
1128 	 * Search the database hash table for dead databases and tell the
1129 	 * collector to drop them.
1130 	 */
1131 	hash_seq_init(&hstat, pgStatDBHash);
1132 	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1133 	{
1134 		Oid			dbid = dbentry->databaseid;
1135 
1136 		CHECK_FOR_INTERRUPTS();
1137 
1138 		/* the DB entry for shared tables (with InvalidOid) is never dropped */
1139 		if (OidIsValid(dbid) &&
1140 			hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
1141 			pgstat_drop_database(dbid);
1142 	}
1143 
1144 	/* Clean up */
1145 	hash_destroy(htab);
1146 
1147 	/*
1148 	 * Search for all the dead replication slots in stats hashtable and tell
1149 	 * the stats collector to drop them.
1150 	 */
1151 	if (replSlotStatHash)
1152 	{
1153 		PgStat_StatReplSlotEntry *slotentry;
1154 
1155 		hash_seq_init(&hstat, replSlotStatHash);
1156 		while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
1157 		{
1158 			CHECK_FOR_INTERRUPTS();
1159 
1160 			if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
1161 				pgstat_report_replslot_drop(NameStr(slotentry->slotname));
1162 		}
1163 	}
1164 
1165 	/*
1166 	 * Lookup our own database entry; if not found, nothing more to do.
1167 	 */
1168 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1169 												 (void *) &MyDatabaseId,
1170 												 HASH_FIND, NULL);
1171 	if (dbentry == NULL || dbentry->tables == NULL)
1172 		return;
1173 
1174 	/*
1175 	 * Similarly to above, make a list of all known relations in this DB.
1176 	 */
1177 	htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid);
1178 
1179 	/*
1180 	 * Initialize our messages table counter to zero
1181 	 */
1182 	msg.m_nentries = 0;
1183 
1184 	/*
1185 	 * Check for all tables listed in stats hashtable if they still exist.
1186 	 */
1187 	hash_seq_init(&hstat, dbentry->tables);
1188 	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1189 	{
1190 		Oid			tabid = tabentry->tableid;
1191 
1192 		CHECK_FOR_INTERRUPTS();
1193 
1194 		if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1195 			continue;
1196 
1197 		/*
1198 		 * Not there, so add this table's Oid to the message
1199 		 */
1200 		msg.m_tableid[msg.m_nentries++] = tabid;
1201 
1202 		/*
1203 		 * If the message is full, send it out and reinitialize to empty
1204 		 */
1205 		if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
1206 		{
1207 			len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1208 				+ msg.m_nentries * sizeof(Oid);
1209 
1210 			pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1211 			msg.m_databaseid = MyDatabaseId;
1212 			pgstat_send(&msg, len);
1213 
1214 			msg.m_nentries = 0;
1215 		}
1216 	}
1217 
1218 	/*
1219 	 * Send the rest
1220 	 */
1221 	if (msg.m_nentries > 0)
1222 	{
1223 		len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1224 			+ msg.m_nentries * sizeof(Oid);
1225 
1226 		pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1227 		msg.m_databaseid = MyDatabaseId;
1228 		pgstat_send(&msg, len);
1229 	}
1230 
1231 	/* Clean up */
1232 	hash_destroy(htab);
1233 
1234 	/*
1235 	 * Now repeat the above steps for functions.  However, we needn't bother
1236 	 * in the common case where no function stats are being collected.
1237 	 */
1238 	if (dbentry->functions != NULL &&
1239 		hash_get_num_entries(dbentry->functions) > 0)
1240 	{
1241 		htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid);
1242 
1243 		pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
1244 		f_msg.m_databaseid = MyDatabaseId;
1245 		f_msg.m_nentries = 0;
1246 
1247 		hash_seq_init(&hstat, dbentry->functions);
1248 		while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
1249 		{
1250 			Oid			funcid = funcentry->functionid;
1251 
1252 			CHECK_FOR_INTERRUPTS();
1253 
1254 			if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
1255 				continue;
1256 
1257 			/*
1258 			 * Not there, so add this function's Oid to the message
1259 			 */
1260 			f_msg.m_functionid[f_msg.m_nentries++] = funcid;
1261 
1262 			/*
1263 			 * If the message is full, send it out and reinitialize to empty
1264 			 */
1265 			if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
1266 			{
1267 				len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1268 					+ f_msg.m_nentries * sizeof(Oid);
1269 
1270 				pgstat_send(&f_msg, len);
1271 
1272 				f_msg.m_nentries = 0;
1273 			}
1274 		}
1275 
1276 		/*
1277 		 * Send the rest
1278 		 */
1279 		if (f_msg.m_nentries > 0)
1280 		{
1281 			len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1282 				+ f_msg.m_nentries * sizeof(Oid);
1283 
1284 			pgstat_send(&f_msg, len);
1285 		}
1286 
1287 		hash_destroy(htab);
1288 	}
1289 }
1290 
1291 
1292 /* ----------
1293  * pgstat_collect_oids() -
1294  *
1295  *	Collect the OIDs of all objects listed in the specified system catalog
1296  *	into a temporary hash table.  Caller should hash_destroy the result
1297  *	when done with it.  (However, we make the table in CurrentMemoryContext
1298  *	so that it will be freed properly in event of an error.)
1299  * ----------
1300  */
1301 static HTAB *
pgstat_collect_oids(Oid catalogid,AttrNumber anum_oid)1302 pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
1303 {
1304 	HTAB	   *htab;
1305 	HASHCTL		hash_ctl;
1306 	Relation	rel;
1307 	TableScanDesc scan;
1308 	HeapTuple	tup;
1309 	Snapshot	snapshot;
1310 
1311 	hash_ctl.keysize = sizeof(Oid);
1312 	hash_ctl.entrysize = sizeof(Oid);
1313 	hash_ctl.hcxt = CurrentMemoryContext;
1314 	htab = hash_create("Temporary table of OIDs",
1315 					   PGSTAT_TAB_HASH_SIZE,
1316 					   &hash_ctl,
1317 					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1318 
1319 	rel = table_open(catalogid, AccessShareLock);
1320 	snapshot = RegisterSnapshot(GetLatestSnapshot());
1321 	scan = table_beginscan(rel, snapshot, 0, NULL);
1322 	while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
1323 	{
1324 		Oid			thisoid;
1325 		bool		isnull;
1326 
1327 		thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull);
1328 		Assert(!isnull);
1329 
1330 		CHECK_FOR_INTERRUPTS();
1331 
1332 		(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
1333 	}
1334 	table_endscan(scan);
1335 	UnregisterSnapshot(snapshot);
1336 	table_close(rel, AccessShareLock);
1337 
1338 	return htab;
1339 }
1340 
1341 
1342 /* ----------
1343  * pgstat_drop_database() -
1344  *
1345  *	Tell the collector that we just dropped a database.
1346  *	(If the message gets lost, we will still clean the dead DB eventually
1347  *	via future invocations of pgstat_vacuum_stat().)
1348  * ----------
1349  */
1350 void
pgstat_drop_database(Oid databaseid)1351 pgstat_drop_database(Oid databaseid)
1352 {
1353 	PgStat_MsgDropdb msg;
1354 
1355 	if (pgStatSock == PGINVALID_SOCKET)
1356 		return;
1357 
1358 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1359 	msg.m_databaseid = databaseid;
1360 	pgstat_send(&msg, sizeof(msg));
1361 }
1362 
1363 
1364 /* ----------
1365  * pgstat_drop_relation() -
1366  *
1367  *	Tell the collector that we just dropped a relation.
1368  *	(If the message gets lost, we will still clean the dead entry eventually
1369  *	via future invocations of pgstat_vacuum_stat().)
1370  *
1371  *	Currently not used for lack of any good place to call it; we rely
1372  *	entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1373  * ----------
1374  */
1375 #ifdef NOT_USED
1376 void
pgstat_drop_relation(Oid relid)1377 pgstat_drop_relation(Oid relid)
1378 {
1379 	PgStat_MsgTabpurge msg;
1380 	int			len;
1381 
1382 	if (pgStatSock == PGINVALID_SOCKET)
1383 		return;
1384 
1385 	msg.m_tableid[0] = relid;
1386 	msg.m_nentries = 1;
1387 
1388 	len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1389 
1390 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1391 	msg.m_databaseid = MyDatabaseId;
1392 	pgstat_send(&msg, len);
1393 }
1394 #endif							/* NOT_USED */
1395 
1396 /* ----------
1397  * pgstat_reset_counters() -
1398  *
1399  *	Tell the statistics collector to reset counters for our database.
1400  *
1401  *	Permission checking for this function is managed through the normal
1402  *	GRANT system.
1403  * ----------
1404  */
1405 void
pgstat_reset_counters(void)1406 pgstat_reset_counters(void)
1407 {
1408 	PgStat_MsgResetcounter msg;
1409 
1410 	if (pgStatSock == PGINVALID_SOCKET)
1411 		return;
1412 
1413 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1414 	msg.m_databaseid = MyDatabaseId;
1415 	pgstat_send(&msg, sizeof(msg));
1416 }
1417 
1418 /* ----------
1419  * pgstat_reset_shared_counters() -
1420  *
1421  *	Tell the statistics collector to reset cluster-wide shared counters.
1422  *
1423  *	Permission checking for this function is managed through the normal
1424  *	GRANT system.
1425  * ----------
1426  */
1427 void
pgstat_reset_shared_counters(const char * target)1428 pgstat_reset_shared_counters(const char *target)
1429 {
1430 	PgStat_MsgResetsharedcounter msg;
1431 
1432 	if (pgStatSock == PGINVALID_SOCKET)
1433 		return;
1434 
1435 	if (strcmp(target, "archiver") == 0)
1436 		msg.m_resettarget = RESET_ARCHIVER;
1437 	else if (strcmp(target, "bgwriter") == 0)
1438 		msg.m_resettarget = RESET_BGWRITER;
1439 	else if (strcmp(target, "wal") == 0)
1440 		msg.m_resettarget = RESET_WAL;
1441 	else
1442 		ereport(ERROR,
1443 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1444 				 errmsg("unrecognized reset target: \"%s\"", target),
1445 				 errhint("Target must be \"archiver\", \"bgwriter\", or \"wal\".")));
1446 
1447 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
1448 	pgstat_send(&msg, sizeof(msg));
1449 }
1450 
1451 /* ----------
1452  * pgstat_reset_single_counter() -
1453  *
1454  *	Tell the statistics collector to reset a single counter.
1455  *
1456  *	Permission checking for this function is managed through the normal
1457  *	GRANT system.
1458  * ----------
1459  */
1460 void
pgstat_reset_single_counter(Oid objoid,PgStat_Single_Reset_Type type)1461 pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
1462 {
1463 	PgStat_MsgResetsinglecounter msg;
1464 
1465 	if (pgStatSock == PGINVALID_SOCKET)
1466 		return;
1467 
1468 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
1469 	msg.m_databaseid = MyDatabaseId;
1470 	msg.m_resettype = type;
1471 	msg.m_objectid = objoid;
1472 
1473 	pgstat_send(&msg, sizeof(msg));
1474 }
1475 
1476 /* ----------
1477  * pgstat_reset_slru_counter() -
1478  *
1479  *	Tell the statistics collector to reset a single SLRU counter, or all
1480  *	SLRU counters (when name is null).
1481  *
1482  *	Permission checking for this function is managed through the normal
1483  *	GRANT system.
1484  * ----------
1485  */
1486 void
pgstat_reset_slru_counter(const char * name)1487 pgstat_reset_slru_counter(const char *name)
1488 {
1489 	PgStat_MsgResetslrucounter msg;
1490 
1491 	if (pgStatSock == PGINVALID_SOCKET)
1492 		return;
1493 
1494 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSLRUCOUNTER);
1495 	msg.m_index = (name) ? pgstat_slru_index(name) : -1;
1496 
1497 	pgstat_send(&msg, sizeof(msg));
1498 }
1499 
1500 /* ----------
1501  * pgstat_reset_replslot_counter() -
1502  *
1503  *	Tell the statistics collector to reset a single replication slot
1504  *	counter, or all replication slots counters (when name is null).
1505  *
1506  *	Permission checking for this function is managed through the normal
1507  *	GRANT system.
1508  * ----------
1509  */
1510 void
pgstat_reset_replslot_counter(const char * name)1511 pgstat_reset_replslot_counter(const char *name)
1512 {
1513 	PgStat_MsgResetreplslotcounter msg;
1514 
1515 	if (pgStatSock == PGINVALID_SOCKET)
1516 		return;
1517 
1518 	if (name)
1519 	{
1520 		namestrcpy(&msg.m_slotname, name);
1521 		msg.clearall = false;
1522 	}
1523 	else
1524 		msg.clearall = true;
1525 
1526 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
1527 
1528 	pgstat_send(&msg, sizeof(msg));
1529 }
1530 
1531 /* ----------
1532  * pgstat_report_autovac() -
1533  *
1534  *	Called from autovacuum.c to report startup of an autovacuum process.
1535  *	We are called before InitPostgres is done, so can't rely on MyDatabaseId;
1536  *	the db OID must be passed in, instead.
1537  * ----------
1538  */
1539 void
pgstat_report_autovac(Oid dboid)1540 pgstat_report_autovac(Oid dboid)
1541 {
1542 	PgStat_MsgAutovacStart msg;
1543 
1544 	if (pgStatSock == PGINVALID_SOCKET)
1545 		return;
1546 
1547 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
1548 	msg.m_databaseid = dboid;
1549 	msg.m_start_time = GetCurrentTimestamp();
1550 
1551 	pgstat_send(&msg, sizeof(msg));
1552 }
1553 
1554 
1555 /* ---------
1556  * pgstat_report_vacuum() -
1557  *
1558  *	Tell the collector about the table we just vacuumed.
1559  * ---------
1560  */
1561 void
pgstat_report_vacuum(Oid tableoid,bool shared,PgStat_Counter livetuples,PgStat_Counter deadtuples)1562 pgstat_report_vacuum(Oid tableoid, bool shared,
1563 					 PgStat_Counter livetuples, PgStat_Counter deadtuples)
1564 {
1565 	PgStat_MsgVacuum msg;
1566 
1567 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1568 		return;
1569 
1570 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
1571 	msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
1572 	msg.m_tableoid = tableoid;
1573 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1574 	msg.m_vacuumtime = GetCurrentTimestamp();
1575 	msg.m_live_tuples = livetuples;
1576 	msg.m_dead_tuples = deadtuples;
1577 	pgstat_send(&msg, sizeof(msg));
1578 }
1579 
1580 /* --------
1581  * pgstat_report_analyze() -
1582  *
1583  *	Tell the collector about the table we just analyzed.
1584  *
1585  * Caller must provide new live- and dead-tuples estimates, as well as a
1586  * flag indicating whether to reset the changes_since_analyze counter.
1587  * --------
1588  */
1589 void
pgstat_report_analyze(Relation rel,PgStat_Counter livetuples,PgStat_Counter deadtuples,bool resetcounter)1590 pgstat_report_analyze(Relation rel,
1591 					  PgStat_Counter livetuples, PgStat_Counter deadtuples,
1592 					  bool resetcounter)
1593 {
1594 	PgStat_MsgAnalyze msg;
1595 
1596 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1597 		return;
1598 
1599 	/*
1600 	 * Unlike VACUUM, ANALYZE might be running inside a transaction that has
1601 	 * already inserted and/or deleted rows in the target table. ANALYZE will
1602 	 * have counted such rows as live or dead respectively. Because we will
1603 	 * report our counts of such rows at transaction end, we should subtract
1604 	 * off these counts from what we send to the collector now, else they'll
1605 	 * be double-counted after commit.  (This approach also ensures that the
1606 	 * collector ends up with the right numbers if we abort instead of
1607 	 * committing.)
1608 	 *
1609 	 * Waste no time on partitioned tables, though.
1610 	 */
1611 	if (rel->pgstat_info != NULL &&
1612 		rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1613 	{
1614 		PgStat_TableXactStatus *trans;
1615 
1616 		for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
1617 		{
1618 			livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1619 			deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1620 		}
1621 		/* count stuff inserted by already-aborted subxacts, too */
1622 		deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1623 		/* Since ANALYZE's counts are estimates, we could have underflowed */
1624 		livetuples = Max(livetuples, 0);
1625 		deadtuples = Max(deadtuples, 0);
1626 	}
1627 
1628 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
1629 	msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
1630 	msg.m_tableoid = RelationGetRelid(rel);
1631 	msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1632 	msg.m_resetcounter = resetcounter;
1633 	msg.m_analyzetime = GetCurrentTimestamp();
1634 	msg.m_live_tuples = livetuples;
1635 	msg.m_dead_tuples = deadtuples;
1636 	pgstat_send(&msg, sizeof(msg));
1637 }
1638 
1639 /* --------
1640  * pgstat_report_recovery_conflict() -
1641  *
1642  *	Tell the collector about a Hot Standby recovery conflict.
1643  * --------
1644  */
1645 void
pgstat_report_recovery_conflict(int reason)1646 pgstat_report_recovery_conflict(int reason)
1647 {
1648 	PgStat_MsgRecoveryConflict msg;
1649 
1650 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1651 		return;
1652 
1653 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
1654 	msg.m_databaseid = MyDatabaseId;
1655 	msg.m_reason = reason;
1656 	pgstat_send(&msg, sizeof(msg));
1657 }
1658 
1659 /* --------
1660  * pgstat_report_deadlock() -
1661  *
1662  *	Tell the collector about a deadlock detected.
1663  * --------
1664  */
1665 void
pgstat_report_deadlock(void)1666 pgstat_report_deadlock(void)
1667 {
1668 	PgStat_MsgDeadlock msg;
1669 
1670 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1671 		return;
1672 
1673 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DEADLOCK);
1674 	msg.m_databaseid = MyDatabaseId;
1675 	pgstat_send(&msg, sizeof(msg));
1676 }
1677 
1678 
1679 
1680 /* --------
1681  * pgstat_report_checksum_failures_in_db() -
1682  *
1683  *	Tell the collector about one or more checksum failures.
1684  * --------
1685  */
1686 void
pgstat_report_checksum_failures_in_db(Oid dboid,int failurecount)1687 pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount)
1688 {
1689 	PgStat_MsgChecksumFailure msg;
1690 
1691 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1692 		return;
1693 
1694 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_CHECKSUMFAILURE);
1695 	msg.m_databaseid = dboid;
1696 	msg.m_failurecount = failurecount;
1697 	msg.m_failure_time = GetCurrentTimestamp();
1698 
1699 	pgstat_send(&msg, sizeof(msg));
1700 }
1701 
1702 /* --------
1703  * pgstat_report_checksum_failure() -
1704  *
1705  *	Tell the collector about a checksum failure.
1706  * --------
1707  */
1708 void
pgstat_report_checksum_failure(void)1709 pgstat_report_checksum_failure(void)
1710 {
1711 	pgstat_report_checksum_failures_in_db(MyDatabaseId, 1);
1712 }
1713 
1714 /* --------
1715  * pgstat_report_tempfile() -
1716  *
1717  *	Tell the collector about a temporary file.
1718  * --------
1719  */
1720 void
pgstat_report_tempfile(size_t filesize)1721 pgstat_report_tempfile(size_t filesize)
1722 {
1723 	PgStat_MsgTempFile msg;
1724 
1725 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1726 		return;
1727 
1728 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TEMPFILE);
1729 	msg.m_databaseid = MyDatabaseId;
1730 	msg.m_filesize = filesize;
1731 	pgstat_send(&msg, sizeof(msg));
1732 }
1733 
1734 /* --------
1735  * pgstat_report_connect() -
1736  *
1737  *	Tell the collector about a new connection.
1738  * --------
1739  */
1740 void
pgstat_report_connect(Oid dboid)1741 pgstat_report_connect(Oid dboid)
1742 {
1743 	PgStat_MsgConnect msg;
1744 
1745 	if (!pgstat_should_report_connstat())
1746 		return;
1747 
1748 	pgLastSessionReportTime = MyStartTimestamp;
1749 
1750 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_CONNECT);
1751 	msg.m_databaseid = MyDatabaseId;
1752 	pgstat_send(&msg, sizeof(PgStat_MsgConnect));
1753 }
1754 
1755 /* --------
1756  * pgstat_report_disconnect() -
1757  *
1758  *	Tell the collector about a disconnect.
1759  * --------
1760  */
1761 static void
pgstat_report_disconnect(Oid dboid)1762 pgstat_report_disconnect(Oid dboid)
1763 {
1764 	PgStat_MsgDisconnect msg;
1765 
1766 	if (!pgstat_should_report_connstat())
1767 		return;
1768 
1769 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DISCONNECT);
1770 	msg.m_databaseid = MyDatabaseId;
1771 	msg.m_cause = pgStatSessionEndCause;
1772 	pgstat_send(&msg, sizeof(PgStat_MsgDisconnect));
1773 }
1774 
1775 /* --------
1776  * pgstat_should_report_connstats() -
1777  *
1778  *	We report session statistics only for normal backend processes.  Parallel
1779  *	workers run in parallel, so they don't contribute to session times, even
1780  *	though they use CPU time. Walsender processes could be considered here,
1781  *	but they have different session characteristics from normal backends (for
1782  *	example, they are always "active"), so they would skew session statistics.
1783  * ----------
1784  */
1785 static bool
pgstat_should_report_connstat(void)1786 pgstat_should_report_connstat(void)
1787 {
1788 	return MyBackendType == B_BACKEND;
1789 }
1790 
1791 /* ----------
1792  * pgstat_report_replslot() -
1793  *
1794  *	Tell the collector about replication slot statistics.
1795  * ----------
1796  */
1797 void
pgstat_report_replslot(const PgStat_StatReplSlotEntry * repSlotStat)1798 pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
1799 {
1800 	PgStat_MsgReplSlot msg;
1801 
1802 	/*
1803 	 * Prepare and send the message
1804 	 */
1805 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1806 	namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
1807 	msg.m_create = false;
1808 	msg.m_drop = false;
1809 	msg.m_spill_txns = repSlotStat->spill_txns;
1810 	msg.m_spill_count = repSlotStat->spill_count;
1811 	msg.m_spill_bytes = repSlotStat->spill_bytes;
1812 	msg.m_stream_txns = repSlotStat->stream_txns;
1813 	msg.m_stream_count = repSlotStat->stream_count;
1814 	msg.m_stream_bytes = repSlotStat->stream_bytes;
1815 	msg.m_total_txns = repSlotStat->total_txns;
1816 	msg.m_total_bytes = repSlotStat->total_bytes;
1817 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1818 }
1819 
1820 /* ----------
1821  * pgstat_report_replslot_create() -
1822  *
1823  *	Tell the collector about creating the replication slot.
1824  * ----------
1825  */
1826 void
pgstat_report_replslot_create(const char * slotname)1827 pgstat_report_replslot_create(const char *slotname)
1828 {
1829 	PgStat_MsgReplSlot msg;
1830 
1831 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1832 	namestrcpy(&msg.m_slotname, slotname);
1833 	msg.m_create = true;
1834 	msg.m_drop = false;
1835 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1836 }
1837 
1838 /* ----------
1839  * pgstat_report_replslot_drop() -
1840  *
1841  *	Tell the collector about dropping the replication slot.
1842  * ----------
1843  */
1844 void
pgstat_report_replslot_drop(const char * slotname)1845 pgstat_report_replslot_drop(const char *slotname)
1846 {
1847 	PgStat_MsgReplSlot msg;
1848 
1849 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1850 	namestrcpy(&msg.m_slotname, slotname);
1851 	msg.m_create = false;
1852 	msg.m_drop = true;
1853 	pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1854 }
1855 
1856 /* ----------
1857  * pgstat_ping() -
1858  *
1859  *	Send some junk data to the collector to increase traffic.
1860  * ----------
1861  */
1862 void
pgstat_ping(void)1863 pgstat_ping(void)
1864 {
1865 	PgStat_MsgDummy msg;
1866 
1867 	if (pgStatSock == PGINVALID_SOCKET)
1868 		return;
1869 
1870 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1871 	pgstat_send(&msg, sizeof(msg));
1872 }
1873 
1874 /* ----------
1875  * pgstat_send_inquiry() -
1876  *
1877  *	Notify collector that we need fresh data.
1878  * ----------
1879  */
1880 static void
pgstat_send_inquiry(TimestampTz clock_time,TimestampTz cutoff_time,Oid databaseid)1881 pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1882 {
1883 	PgStat_MsgInquiry msg;
1884 
1885 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
1886 	msg.clock_time = clock_time;
1887 	msg.cutoff_time = cutoff_time;
1888 	msg.databaseid = databaseid;
1889 	pgstat_send(&msg, sizeof(msg));
1890 }
1891 
1892 
1893 /*
1894  * Initialize function call usage data.
1895  * Called by the executor before invoking a function.
1896  */
1897 void
pgstat_init_function_usage(FunctionCallInfo fcinfo,PgStat_FunctionCallUsage * fcu)1898 pgstat_init_function_usage(FunctionCallInfo fcinfo,
1899 						   PgStat_FunctionCallUsage *fcu)
1900 {
1901 	PgStat_BackendFunctionEntry *htabent;
1902 	bool		found;
1903 
1904 	if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
1905 	{
1906 		/* stats not wanted */
1907 		fcu->fs = NULL;
1908 		return;
1909 	}
1910 
1911 	if (!pgStatFunctions)
1912 	{
1913 		/* First time through - initialize function stat table */
1914 		HASHCTL		hash_ctl;
1915 
1916 		hash_ctl.keysize = sizeof(Oid);
1917 		hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
1918 		pgStatFunctions = hash_create("Function stat entries",
1919 									  PGSTAT_FUNCTION_HASH_SIZE,
1920 									  &hash_ctl,
1921 									  HASH_ELEM | HASH_BLOBS);
1922 	}
1923 
1924 	/* Get the stats entry for this function, create if necessary */
1925 	htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
1926 						  HASH_ENTER, &found);
1927 	if (!found)
1928 		MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));
1929 
1930 	fcu->fs = &htabent->f_counts;
1931 
1932 	/* save stats for this function, later used to compensate for recursion */
1933 	fcu->save_f_total_time = htabent->f_counts.f_total_time;
1934 
1935 	/* save current backend-wide total time */
1936 	fcu->save_total = total_func_time;
1937 
1938 	/* get clock time as of function start */
1939 	INSTR_TIME_SET_CURRENT(fcu->f_start);
1940 }
1941 
1942 /*
1943  * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
1944  *		for specified function
1945  *
1946  * If no entry, return NULL, don't create a new one
1947  */
1948 PgStat_BackendFunctionEntry *
find_funcstat_entry(Oid func_id)1949 find_funcstat_entry(Oid func_id)
1950 {
1951 	if (pgStatFunctions == NULL)
1952 		return NULL;
1953 
1954 	return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
1955 													   (void *) &func_id,
1956 													   HASH_FIND, NULL);
1957 }
1958 
1959 /*
1960  * Calculate function call usage and update stat counters.
1961  * Called by the executor after invoking a function.
1962  *
1963  * In the case of a set-returning function that runs in value-per-call mode,
1964  * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
1965  * calls for what the user considers a single call of the function.  The
1966  * finalize flag should be TRUE on the last call.
1967  */
1968 void
pgstat_end_function_usage(PgStat_FunctionCallUsage * fcu,bool finalize)1969 pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
1970 {
1971 	PgStat_FunctionCounts *fs = fcu->fs;
1972 	instr_time	f_total;
1973 	instr_time	f_others;
1974 	instr_time	f_self;
1975 
1976 	/* stats not wanted? */
1977 	if (fs == NULL)
1978 		return;
1979 
1980 	/* total elapsed time in this function call */
1981 	INSTR_TIME_SET_CURRENT(f_total);
1982 	INSTR_TIME_SUBTRACT(f_total, fcu->f_start);
1983 
1984 	/* self usage: elapsed minus anything already charged to other calls */
1985 	f_others = total_func_time;
1986 	INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
1987 	f_self = f_total;
1988 	INSTR_TIME_SUBTRACT(f_self, f_others);
1989 
1990 	/* update backend-wide total time */
1991 	INSTR_TIME_ADD(total_func_time, f_self);
1992 
1993 	/*
1994 	 * Compute the new f_total_time as the total elapsed time added to the
1995 	 * pre-call value of f_total_time.  This is necessary to avoid
1996 	 * double-counting any time taken by recursive calls of myself.  (We do
1997 	 * not need any similar kluge for self time, since that already excludes
1998 	 * any recursive calls.)
1999 	 */
2000 	INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
2001 
2002 	/* update counters in function stats table */
2003 	if (finalize)
2004 		fs->f_numcalls++;
2005 	fs->f_total_time = f_total;
2006 	INSTR_TIME_ADD(fs->f_self_time, f_self);
2007 
2008 	/* indicate that we have something to send */
2009 	have_function_stats = true;
2010 }
2011 
2012 
2013 /* ----------
2014  * pgstat_initstats() -
2015  *
2016  *	Initialize a relcache entry to count access statistics.
2017  *	Called whenever a relation is opened.
2018  *
2019  *	We assume that a relcache entry's pgstat_info field is zeroed by
2020  *	relcache.c when the relcache entry is made; thereafter it is long-lived
2021  *	data.  We can avoid repeated searches of the TabStatus arrays when the
2022  *	same relation is touched repeatedly within a transaction.
2023  * ----------
2024  */
2025 void
pgstat_initstats(Relation rel)2026 pgstat_initstats(Relation rel)
2027 {
2028 	Oid			rel_id = rel->rd_id;
2029 	char		relkind = rel->rd_rel->relkind;
2030 
2031 	/*
2032 	 * We only count stats for relations with storage and partitioned tables
2033 	 */
2034 	if (!RELKIND_HAS_STORAGE(relkind) && relkind != RELKIND_PARTITIONED_TABLE)
2035 	{
2036 		rel->pgstat_info = NULL;
2037 		return;
2038 	}
2039 
2040 	if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
2041 	{
2042 		/* We're not counting at all */
2043 		rel->pgstat_info = NULL;
2044 		return;
2045 	}
2046 
2047 	/*
2048 	 * If we already set up this relation in the current transaction, nothing
2049 	 * to do.
2050 	 */
2051 	if (rel->pgstat_info != NULL &&
2052 		rel->pgstat_info->t_id == rel_id)
2053 		return;
2054 
2055 	/* Else find or make the PgStat_TableStatus entry, and update link */
2056 	rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
2057 }
2058 
2059 /*
2060  * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
2061  */
2062 static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id,bool isshared)2063 get_tabstat_entry(Oid rel_id, bool isshared)
2064 {
2065 	TabStatHashEntry *hash_entry;
2066 	PgStat_TableStatus *entry;
2067 	TabStatusArray *tsa;
2068 	bool		found;
2069 
2070 	/*
2071 	 * Create hash table if we don't have it already.
2072 	 */
2073 	if (pgStatTabHash == NULL)
2074 	{
2075 		HASHCTL		ctl;
2076 
2077 		ctl.keysize = sizeof(Oid);
2078 		ctl.entrysize = sizeof(TabStatHashEntry);
2079 
2080 		pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
2081 									TABSTAT_QUANTUM,
2082 									&ctl,
2083 									HASH_ELEM | HASH_BLOBS);
2084 	}
2085 
2086 	/*
2087 	 * Find an entry or create a new one.
2088 	 */
2089 	hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
2090 	if (!found)
2091 	{
2092 		/* initialize new entry with null pointer */
2093 		hash_entry->tsa_entry = NULL;
2094 	}
2095 
2096 	/*
2097 	 * If entry is already valid, we're done.
2098 	 */
2099 	if (hash_entry->tsa_entry)
2100 		return hash_entry->tsa_entry;
2101 
2102 	/*
2103 	 * Locate the first pgStatTabList entry with free space, making a new list
2104 	 * entry if needed.  Note that we could get an OOM failure here, but if so
2105 	 * we have left the hashtable and the list in a consistent state.
2106 	 */
2107 	if (pgStatTabList == NULL)
2108 	{
2109 		/* Set up first pgStatTabList entry */
2110 		pgStatTabList = (TabStatusArray *)
2111 			MemoryContextAllocZero(TopMemoryContext,
2112 								   sizeof(TabStatusArray));
2113 	}
2114 
2115 	tsa = pgStatTabList;
2116 	while (tsa->tsa_used >= TABSTAT_QUANTUM)
2117 	{
2118 		if (tsa->tsa_next == NULL)
2119 			tsa->tsa_next = (TabStatusArray *)
2120 				MemoryContextAllocZero(TopMemoryContext,
2121 									   sizeof(TabStatusArray));
2122 		tsa = tsa->tsa_next;
2123 	}
2124 
2125 	/*
2126 	 * Allocate a PgStat_TableStatus entry within this list entry.  We assume
2127 	 * the entry was already zeroed, either at creation or after last use.
2128 	 */
2129 	entry = &tsa->tsa_entries[tsa->tsa_used++];
2130 	entry->t_id = rel_id;
2131 	entry->t_shared = isshared;
2132 
2133 	/*
2134 	 * Now we can fill the entry in pgStatTabHash.
2135 	 */
2136 	hash_entry->tsa_entry = entry;
2137 
2138 	return entry;
2139 }
2140 
2141 /*
2142  * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
2143  *
2144  * If no entry, return NULL, don't create a new one
2145  *
2146  * Note: if we got an error in the most recent execution of pgstat_report_stat,
2147  * it's possible that an entry exists but there's no hashtable entry for it.
2148  * That's okay, we'll treat this case as "doesn't exist".
2149  */
2150 PgStat_TableStatus *
find_tabstat_entry(Oid rel_id)2151 find_tabstat_entry(Oid rel_id)
2152 {
2153 	TabStatHashEntry *hash_entry;
2154 
2155 	/* If hashtable doesn't exist, there are no entries at all */
2156 	if (!pgStatTabHash)
2157 		return NULL;
2158 
2159 	hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
2160 	if (!hash_entry)
2161 		return NULL;
2162 
2163 	/* Note that this step could also return NULL, but that's correct */
2164 	return hash_entry->tsa_entry;
2165 }
2166 
2167 /*
2168  * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
2169  */
2170 static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)2171 get_tabstat_stack_level(int nest_level)
2172 {
2173 	PgStat_SubXactStatus *xact_state;
2174 
2175 	xact_state = pgStatXactStack;
2176 	if (xact_state == NULL || xact_state->nest_level != nest_level)
2177 	{
2178 		xact_state = (PgStat_SubXactStatus *)
2179 			MemoryContextAlloc(TopTransactionContext,
2180 							   sizeof(PgStat_SubXactStatus));
2181 		xact_state->nest_level = nest_level;
2182 		xact_state->prev = pgStatXactStack;
2183 		xact_state->first = NULL;
2184 		pgStatXactStack = xact_state;
2185 	}
2186 	return xact_state;
2187 }
2188 
2189 /*
2190  * add_tabstat_xact_level - add a new (sub)transaction state record
2191  */
2192 static void
add_tabstat_xact_level(PgStat_TableStatus * pgstat_info,int nest_level)2193 add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
2194 {
2195 	PgStat_SubXactStatus *xact_state;
2196 	PgStat_TableXactStatus *trans;
2197 
2198 	/*
2199 	 * If this is the first rel to be modified at the current nest level, we
2200 	 * first have to push a transaction stack entry.
2201 	 */
2202 	xact_state = get_tabstat_stack_level(nest_level);
2203 
2204 	/* Now make a per-table stack entry */
2205 	trans = (PgStat_TableXactStatus *)
2206 		MemoryContextAllocZero(TopTransactionContext,
2207 							   sizeof(PgStat_TableXactStatus));
2208 	trans->nest_level = nest_level;
2209 	trans->upper = pgstat_info->trans;
2210 	trans->parent = pgstat_info;
2211 	trans->next = xact_state->first;
2212 	xact_state->first = trans;
2213 	pgstat_info->trans = trans;
2214 }
2215 
2216 /*
2217  * pgstat_count_heap_insert - count a tuple insertion of n tuples
2218  */
2219 void
pgstat_count_heap_insert(Relation rel,PgStat_Counter n)2220 pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
2221 {
2222 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2223 
2224 	if (pgstat_info != NULL)
2225 	{
2226 		/* We have to log the effect at the proper transactional level */
2227 		int			nest_level = GetCurrentTransactionNestLevel();
2228 
2229 		if (pgstat_info->trans == NULL ||
2230 			pgstat_info->trans->nest_level != nest_level)
2231 			add_tabstat_xact_level(pgstat_info, nest_level);
2232 
2233 		pgstat_info->trans->tuples_inserted += n;
2234 	}
2235 }
2236 
2237 /*
2238  * pgstat_count_heap_update - count a tuple update
2239  */
2240 void
pgstat_count_heap_update(Relation rel,bool hot)2241 pgstat_count_heap_update(Relation rel, bool hot)
2242 {
2243 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2244 
2245 	if (pgstat_info != NULL)
2246 	{
2247 		/* We have to log the effect at the proper transactional level */
2248 		int			nest_level = GetCurrentTransactionNestLevel();
2249 
2250 		if (pgstat_info->trans == NULL ||
2251 			pgstat_info->trans->nest_level != nest_level)
2252 			add_tabstat_xact_level(pgstat_info, nest_level);
2253 
2254 		pgstat_info->trans->tuples_updated++;
2255 
2256 		/* t_tuples_hot_updated is nontransactional, so just advance it */
2257 		if (hot)
2258 			pgstat_info->t_counts.t_tuples_hot_updated++;
2259 	}
2260 }
2261 
2262 /*
2263  * pgstat_count_heap_delete - count a tuple deletion
2264  */
2265 void
pgstat_count_heap_delete(Relation rel)2266 pgstat_count_heap_delete(Relation rel)
2267 {
2268 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2269 
2270 	if (pgstat_info != NULL)
2271 	{
2272 		/* We have to log the effect at the proper transactional level */
2273 		int			nest_level = GetCurrentTransactionNestLevel();
2274 
2275 		if (pgstat_info->trans == NULL ||
2276 			pgstat_info->trans->nest_level != nest_level)
2277 			add_tabstat_xact_level(pgstat_info, nest_level);
2278 
2279 		pgstat_info->trans->tuples_deleted++;
2280 	}
2281 }
2282 
2283 /*
2284  * pgstat_truncate_save_counters
2285  *
2286  * Whenever a table is truncated, we save its i/u/d counters so that they can
2287  * be cleared, and if the (sub)xact that executed the truncate later aborts,
2288  * the counters can be restored to the saved (pre-truncate) values.  Note we do
2289  * this on the first truncate in any particular subxact level only.
2290  */
2291 static void
pgstat_truncate_save_counters(PgStat_TableXactStatus * trans)2292 pgstat_truncate_save_counters(PgStat_TableXactStatus *trans)
2293 {
2294 	if (!trans->truncated)
2295 	{
2296 		trans->inserted_pre_trunc = trans->tuples_inserted;
2297 		trans->updated_pre_trunc = trans->tuples_updated;
2298 		trans->deleted_pre_trunc = trans->tuples_deleted;
2299 		trans->truncated = true;
2300 	}
2301 }
2302 
2303 /*
2304  * pgstat_truncate_restore_counters - restore counters when a truncate aborts
2305  */
2306 static void
pgstat_truncate_restore_counters(PgStat_TableXactStatus * trans)2307 pgstat_truncate_restore_counters(PgStat_TableXactStatus *trans)
2308 {
2309 	if (trans->truncated)
2310 	{
2311 		trans->tuples_inserted = trans->inserted_pre_trunc;
2312 		trans->tuples_updated = trans->updated_pre_trunc;
2313 		trans->tuples_deleted = trans->deleted_pre_trunc;
2314 	}
2315 }
2316 
2317 /*
2318  * pgstat_count_truncate - update tuple counters due to truncate
2319  */
2320 void
pgstat_count_truncate(Relation rel)2321 pgstat_count_truncate(Relation rel)
2322 {
2323 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2324 
2325 	if (pgstat_info != NULL)
2326 	{
2327 		/* We have to log the effect at the proper transactional level */
2328 		int			nest_level = GetCurrentTransactionNestLevel();
2329 
2330 		if (pgstat_info->trans == NULL ||
2331 			pgstat_info->trans->nest_level != nest_level)
2332 			add_tabstat_xact_level(pgstat_info, nest_level);
2333 
2334 		pgstat_truncate_save_counters(pgstat_info->trans);
2335 		pgstat_info->trans->tuples_inserted = 0;
2336 		pgstat_info->trans->tuples_updated = 0;
2337 		pgstat_info->trans->tuples_deleted = 0;
2338 	}
2339 }
2340 
2341 /*
2342  * pgstat_update_heap_dead_tuples - update dead-tuples count
2343  *
2344  * The semantics of this are that we are reporting the nontransactional
2345  * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
2346  * rather than increasing, and the change goes straight into the per-table
2347  * counter, not into transactional state.
2348  */
2349 void
pgstat_update_heap_dead_tuples(Relation rel,int delta)2350 pgstat_update_heap_dead_tuples(Relation rel, int delta)
2351 {
2352 	PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2353 
2354 	if (pgstat_info != NULL)
2355 		pgstat_info->t_counts.t_delta_dead_tuples -= delta;
2356 }
2357 
2358 
2359 /* ----------
2360  * AtEOXact_PgStat
2361  *
2362  *	Called from access/transam/xact.c at top-level transaction commit/abort.
2363  * ----------
2364  */
2365 void
AtEOXact_PgStat(bool isCommit,bool parallel)2366 AtEOXact_PgStat(bool isCommit, bool parallel)
2367 {
2368 	PgStat_SubXactStatus *xact_state;
2369 
2370 	/* Don't count parallel worker transaction stats */
2371 	if (!parallel)
2372 	{
2373 		/*
2374 		 * Count transaction commit or abort.  (We use counters, not just
2375 		 * bools, in case the reporting message isn't sent right away.)
2376 		 */
2377 		if (isCommit)
2378 			pgStatXactCommit++;
2379 		else
2380 			pgStatXactRollback++;
2381 	}
2382 
2383 	/*
2384 	 * Transfer transactional insert/update counts into the base tabstat
2385 	 * entries.  We don't bother to free any of the transactional state, since
2386 	 * it's all in TopTransactionContext and will go away anyway.
2387 	 */
2388 	xact_state = pgStatXactStack;
2389 	if (xact_state != NULL)
2390 	{
2391 		PgStat_TableXactStatus *trans;
2392 
2393 		Assert(xact_state->nest_level == 1);
2394 		Assert(xact_state->prev == NULL);
2395 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2396 		{
2397 			PgStat_TableStatus *tabstat;
2398 
2399 			Assert(trans->nest_level == 1);
2400 			Assert(trans->upper == NULL);
2401 			tabstat = trans->parent;
2402 			Assert(tabstat->trans == trans);
2403 			/* restore pre-truncate stats (if any) in case of aborted xact */
2404 			if (!isCommit)
2405 				pgstat_truncate_restore_counters(trans);
2406 			/* count attempted actions regardless of commit/abort */
2407 			tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2408 			tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2409 			tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2410 			if (isCommit)
2411 			{
2412 				tabstat->t_counts.t_truncated = trans->truncated;
2413 				if (trans->truncated)
2414 				{
2415 					/* forget live/dead stats seen by backend thus far */
2416 					tabstat->t_counts.t_delta_live_tuples = 0;
2417 					tabstat->t_counts.t_delta_dead_tuples = 0;
2418 				}
2419 				/* insert adds a live tuple, delete removes one */
2420 				tabstat->t_counts.t_delta_live_tuples +=
2421 					trans->tuples_inserted - trans->tuples_deleted;
2422 				/* update and delete each create a dead tuple */
2423 				tabstat->t_counts.t_delta_dead_tuples +=
2424 					trans->tuples_updated + trans->tuples_deleted;
2425 				/* insert, update, delete each count as one change event */
2426 				tabstat->t_counts.t_changed_tuples +=
2427 					trans->tuples_inserted + trans->tuples_updated +
2428 					trans->tuples_deleted;
2429 			}
2430 			else
2431 			{
2432 				/* inserted tuples are dead, deleted tuples are unaffected */
2433 				tabstat->t_counts.t_delta_dead_tuples +=
2434 					trans->tuples_inserted + trans->tuples_updated;
2435 				/* an aborted xact generates no changed_tuple events */
2436 			}
2437 			tabstat->trans = NULL;
2438 		}
2439 	}
2440 	pgStatXactStack = NULL;
2441 
2442 	/* Make sure any stats snapshot is thrown away */
2443 	pgstat_clear_snapshot();
2444 }
2445 
2446 /* ----------
2447  * AtEOSubXact_PgStat
2448  *
2449  *	Called from access/transam/xact.c at subtransaction commit/abort.
2450  * ----------
2451  */
2452 void
AtEOSubXact_PgStat(bool isCommit,int nestDepth)2453 AtEOSubXact_PgStat(bool isCommit, int nestDepth)
2454 {
2455 	PgStat_SubXactStatus *xact_state;
2456 
2457 	/*
2458 	 * Transfer transactional insert/update counts into the next higher
2459 	 * subtransaction state.
2460 	 */
2461 	xact_state = pgStatXactStack;
2462 	if (xact_state != NULL &&
2463 		xact_state->nest_level >= nestDepth)
2464 	{
2465 		PgStat_TableXactStatus *trans;
2466 		PgStat_TableXactStatus *next_trans;
2467 
2468 		/* delink xact_state from stack immediately to simplify reuse case */
2469 		pgStatXactStack = xact_state->prev;
2470 
2471 		for (trans = xact_state->first; trans != NULL; trans = next_trans)
2472 		{
2473 			PgStat_TableStatus *tabstat;
2474 
2475 			next_trans = trans->next;
2476 			Assert(trans->nest_level == nestDepth);
2477 			tabstat = trans->parent;
2478 			Assert(tabstat->trans == trans);
2479 			if (isCommit)
2480 			{
2481 				if (trans->upper && trans->upper->nest_level == nestDepth - 1)
2482 				{
2483 					if (trans->truncated)
2484 					{
2485 						/* propagate the truncate status one level up */
2486 						pgstat_truncate_save_counters(trans->upper);
2487 						/* replace upper xact stats with ours */
2488 						trans->upper->tuples_inserted = trans->tuples_inserted;
2489 						trans->upper->tuples_updated = trans->tuples_updated;
2490 						trans->upper->tuples_deleted = trans->tuples_deleted;
2491 					}
2492 					else
2493 					{
2494 						trans->upper->tuples_inserted += trans->tuples_inserted;
2495 						trans->upper->tuples_updated += trans->tuples_updated;
2496 						trans->upper->tuples_deleted += trans->tuples_deleted;
2497 					}
2498 					tabstat->trans = trans->upper;
2499 					pfree(trans);
2500 				}
2501 				else
2502 				{
2503 					/*
2504 					 * When there isn't an immediate parent state, we can just
2505 					 * reuse the record instead of going through a
2506 					 * palloc/pfree pushup (this works since it's all in
2507 					 * TopTransactionContext anyway).  We have to re-link it
2508 					 * into the parent level, though, and that might mean
2509 					 * pushing a new entry into the pgStatXactStack.
2510 					 */
2511 					PgStat_SubXactStatus *upper_xact_state;
2512 
2513 					upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
2514 					trans->next = upper_xact_state->first;
2515 					upper_xact_state->first = trans;
2516 					trans->nest_level = nestDepth - 1;
2517 				}
2518 			}
2519 			else
2520 			{
2521 				/*
2522 				 * On abort, update top-level tabstat counts, then forget the
2523 				 * subtransaction
2524 				 */
2525 
2526 				/* first restore values obliterated by truncate */
2527 				pgstat_truncate_restore_counters(trans);
2528 				/* count attempted actions regardless of commit/abort */
2529 				tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2530 				tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2531 				tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2532 				/* inserted tuples are dead, deleted tuples are unaffected */
2533 				tabstat->t_counts.t_delta_dead_tuples +=
2534 					trans->tuples_inserted + trans->tuples_updated;
2535 				tabstat->trans = trans->upper;
2536 				pfree(trans);
2537 			}
2538 		}
2539 		pfree(xact_state);
2540 	}
2541 }
2542 
2543 
2544 /*
2545  * AtPrepare_PgStat
2546  *		Save the transactional stats state at 2PC transaction prepare.
2547  *
2548  * In this phase we just generate 2PC records for all the pending
2549  * transaction-dependent stats work.
2550  */
2551 void
AtPrepare_PgStat(void)2552 AtPrepare_PgStat(void)
2553 {
2554 	PgStat_SubXactStatus *xact_state;
2555 
2556 	xact_state = pgStatXactStack;
2557 	if (xact_state != NULL)
2558 	{
2559 		PgStat_TableXactStatus *trans;
2560 
2561 		Assert(xact_state->nest_level == 1);
2562 		Assert(xact_state->prev == NULL);
2563 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2564 		{
2565 			PgStat_TableStatus *tabstat;
2566 			TwoPhasePgStatRecord record;
2567 
2568 			Assert(trans->nest_level == 1);
2569 			Assert(trans->upper == NULL);
2570 			tabstat = trans->parent;
2571 			Assert(tabstat->trans == trans);
2572 
2573 			record.tuples_inserted = trans->tuples_inserted;
2574 			record.tuples_updated = trans->tuples_updated;
2575 			record.tuples_deleted = trans->tuples_deleted;
2576 			record.inserted_pre_trunc = trans->inserted_pre_trunc;
2577 			record.updated_pre_trunc = trans->updated_pre_trunc;
2578 			record.deleted_pre_trunc = trans->deleted_pre_trunc;
2579 			record.t_id = tabstat->t_id;
2580 			record.t_shared = tabstat->t_shared;
2581 			record.t_truncated = trans->truncated;
2582 
2583 			RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
2584 								   &record, sizeof(TwoPhasePgStatRecord));
2585 		}
2586 	}
2587 }
2588 
2589 /*
2590  * PostPrepare_PgStat
2591  *		Clean up after successful PREPARE.
2592  *
2593  * All we need do here is unlink the transaction stats state from the
2594  * nontransactional state.  The nontransactional action counts will be
2595  * reported to the stats collector immediately, while the effects on live
2596  * and dead tuple counts are preserved in the 2PC state file.
2597  *
2598  * Note: AtEOXact_PgStat is not called during PREPARE.
2599  */
2600 void
PostPrepare_PgStat(void)2601 PostPrepare_PgStat(void)
2602 {
2603 	PgStat_SubXactStatus *xact_state;
2604 
2605 	/*
2606 	 * We don't bother to free any of the transactional state, since it's all
2607 	 * in TopTransactionContext and will go away anyway.
2608 	 */
2609 	xact_state = pgStatXactStack;
2610 	if (xact_state != NULL)
2611 	{
2612 		PgStat_TableXactStatus *trans;
2613 
2614 		for (trans = xact_state->first; trans != NULL; trans = trans->next)
2615 		{
2616 			PgStat_TableStatus *tabstat;
2617 
2618 			tabstat = trans->parent;
2619 			tabstat->trans = NULL;
2620 		}
2621 	}
2622 	pgStatXactStack = NULL;
2623 
2624 	/* Make sure any stats snapshot is thrown away */
2625 	pgstat_clear_snapshot();
2626 }
2627 
2628 /*
2629  * 2PC processing routine for COMMIT PREPARED case.
2630  *
2631  * Load the saved counts into our local pgstats state.
2632  */
2633 void
pgstat_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)2634 pgstat_twophase_postcommit(TransactionId xid, uint16 info,
2635 						   void *recdata, uint32 len)
2636 {
2637 	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2638 	PgStat_TableStatus *pgstat_info;
2639 
2640 	/* Find or create a tabstat entry for the rel */
2641 	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2642 
2643 	/* Same math as in AtEOXact_PgStat, commit case */
2644 	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2645 	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2646 	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2647 	pgstat_info->t_counts.t_truncated = rec->t_truncated;
2648 	if (rec->t_truncated)
2649 	{
2650 		/* forget live/dead stats seen by backend thus far */
2651 		pgstat_info->t_counts.t_delta_live_tuples = 0;
2652 		pgstat_info->t_counts.t_delta_dead_tuples = 0;
2653 	}
2654 	pgstat_info->t_counts.t_delta_live_tuples +=
2655 		rec->tuples_inserted - rec->tuples_deleted;
2656 	pgstat_info->t_counts.t_delta_dead_tuples +=
2657 		rec->tuples_updated + rec->tuples_deleted;
2658 	pgstat_info->t_counts.t_changed_tuples +=
2659 		rec->tuples_inserted + rec->tuples_updated +
2660 		rec->tuples_deleted;
2661 }
2662 
2663 /*
2664  * 2PC processing routine for ROLLBACK PREPARED case.
2665  *
2666  * Load the saved counts into our local pgstats state, but treat them
2667  * as aborted.
2668  */
2669 void
pgstat_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)2670 pgstat_twophase_postabort(TransactionId xid, uint16 info,
2671 						  void *recdata, uint32 len)
2672 {
2673 	TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2674 	PgStat_TableStatus *pgstat_info;
2675 
2676 	/* Find or create a tabstat entry for the rel */
2677 	pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2678 
2679 	/* Same math as in AtEOXact_PgStat, abort case */
2680 	if (rec->t_truncated)
2681 	{
2682 		rec->tuples_inserted = rec->inserted_pre_trunc;
2683 		rec->tuples_updated = rec->updated_pre_trunc;
2684 		rec->tuples_deleted = rec->deleted_pre_trunc;
2685 	}
2686 	pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2687 	pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2688 	pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2689 	pgstat_info->t_counts.t_delta_dead_tuples +=
2690 		rec->tuples_inserted + rec->tuples_updated;
2691 }
2692 
2693 
2694 /* ----------
2695  * pgstat_fetch_stat_dbentry() -
2696  *
2697  *	Support function for the SQL-callable pgstat* functions. Returns
2698  *	the collected statistics for one database or NULL. NULL doesn't mean
2699  *	that the database doesn't exist, it is just not yet known by the
2700  *	collector, so the caller is better off to report ZERO instead.
2701  * ----------
2702  */
2703 PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)2704 pgstat_fetch_stat_dbentry(Oid dbid)
2705 {
2706 	/*
2707 	 * If not done for this transaction, read the statistics collector stats
2708 	 * file into some hash tables.
2709 	 */
2710 	backend_read_statsfile();
2711 
2712 	/*
2713 	 * Lookup the requested database; return NULL if not found
2714 	 */
2715 	return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2716 											  (void *) &dbid,
2717 											  HASH_FIND, NULL);
2718 }
2719 
2720 
2721 /* ----------
2722  * pgstat_fetch_stat_tabentry() -
2723  *
2724  *	Support function for the SQL-callable pgstat* functions. Returns
2725  *	the collected statistics for one table or NULL. NULL doesn't mean
2726  *	that the table doesn't exist, it is just not yet known by the
2727  *	collector, so the caller is better off to report ZERO instead.
2728  * ----------
2729  */
2730 PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)2731 pgstat_fetch_stat_tabentry(Oid relid)
2732 {
2733 	Oid			dbid;
2734 	PgStat_StatDBEntry *dbentry;
2735 	PgStat_StatTabEntry *tabentry;
2736 
2737 	/*
2738 	 * If not done for this transaction, read the statistics collector stats
2739 	 * file into some hash tables.
2740 	 */
2741 	backend_read_statsfile();
2742 
2743 	/*
2744 	 * Lookup our database, then look in its table hash table.
2745 	 */
2746 	dbid = MyDatabaseId;
2747 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2748 												 (void *) &dbid,
2749 												 HASH_FIND, NULL);
2750 	if (dbentry != NULL && dbentry->tables != NULL)
2751 	{
2752 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2753 													   (void *) &relid,
2754 													   HASH_FIND, NULL);
2755 		if (tabentry)
2756 			return tabentry;
2757 	}
2758 
2759 	/*
2760 	 * If we didn't find it, maybe it's a shared table.
2761 	 */
2762 	dbid = InvalidOid;
2763 	dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2764 												 (void *) &dbid,
2765 												 HASH_FIND, NULL);
2766 	if (dbentry != NULL && dbentry->tables != NULL)
2767 	{
2768 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2769 													   (void *) &relid,
2770 													   HASH_FIND, NULL);
2771 		if (tabentry)
2772 			return tabentry;
2773 	}
2774 
2775 	return NULL;
2776 }
2777 
2778 
2779 /* ----------
2780  * pgstat_fetch_stat_funcentry() -
2781  *
2782  *	Support function for the SQL-callable pgstat* functions. Returns
2783  *	the collected statistics for one function or NULL.
2784  * ----------
2785  */
2786 PgStat_StatFuncEntry *
pgstat_fetch_stat_funcentry(Oid func_id)2787 pgstat_fetch_stat_funcentry(Oid func_id)
2788 {
2789 	PgStat_StatDBEntry *dbentry;
2790 	PgStat_StatFuncEntry *funcentry = NULL;
2791 
2792 	/* load the stats file if needed */
2793 	backend_read_statsfile();
2794 
2795 	/* Lookup our database, then find the requested function.  */
2796 	dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
2797 	if (dbentry != NULL && dbentry->functions != NULL)
2798 	{
2799 		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
2800 														 (void *) &func_id,
2801 														 HASH_FIND, NULL);
2802 	}
2803 
2804 	return funcentry;
2805 }
2806 
2807 
2808 /*
2809  * ---------
2810  * pgstat_fetch_stat_archiver() -
2811  *
2812  *	Support function for the SQL-callable pgstat* functions. Returns
2813  *	a pointer to the archiver statistics struct.
2814  * ---------
2815  */
2816 PgStat_ArchiverStats *
pgstat_fetch_stat_archiver(void)2817 pgstat_fetch_stat_archiver(void)
2818 {
2819 	backend_read_statsfile();
2820 
2821 	return &archiverStats;
2822 }
2823 
2824 
2825 /*
2826  * ---------
2827  * pgstat_fetch_global() -
2828  *
2829  *	Support function for the SQL-callable pgstat* functions. Returns
2830  *	a pointer to the global statistics struct.
2831  * ---------
2832  */
2833 PgStat_GlobalStats *
pgstat_fetch_global(void)2834 pgstat_fetch_global(void)
2835 {
2836 	backend_read_statsfile();
2837 
2838 	return &globalStats;
2839 }
2840 
2841 /*
2842  * ---------
2843  * pgstat_fetch_stat_wal() -
2844  *
2845  *	Support function for the SQL-callable pgstat* functions. Returns
2846  *	a pointer to the WAL statistics struct.
2847  * ---------
2848  */
2849 PgStat_WalStats *
pgstat_fetch_stat_wal(void)2850 pgstat_fetch_stat_wal(void)
2851 {
2852 	backend_read_statsfile();
2853 
2854 	return &walStats;
2855 }
2856 
2857 /*
2858  * ---------
2859  * pgstat_fetch_slru() -
2860  *
2861  *	Support function for the SQL-callable pgstat* functions. Returns
2862  *	a pointer to the slru statistics struct.
2863  * ---------
2864  */
2865 PgStat_SLRUStats *
pgstat_fetch_slru(void)2866 pgstat_fetch_slru(void)
2867 {
2868 	backend_read_statsfile();
2869 
2870 	return slruStats;
2871 }
2872 
2873 /*
2874  * ---------
2875  * pgstat_fetch_replslot() -
2876  *
2877  *	Support function for the SQL-callable pgstat* functions. Returns
2878  *	a pointer to the replication slot statistics struct.
2879  * ---------
2880  */
2881 PgStat_StatReplSlotEntry *
pgstat_fetch_replslot(NameData slotname)2882 pgstat_fetch_replslot(NameData slotname)
2883 {
2884 	backend_read_statsfile();
2885 
2886 	return pgstat_get_replslot_entry(slotname, false);
2887 }
2888 
2889 /*
2890  * Shut down a single backend's statistics reporting at process exit.
2891  *
2892  * Flush any remaining statistics counts out to the collector.
2893  * Without this, operations triggered during backend exit (such as
2894  * temp table deletions) won't be counted.
2895  */
2896 static void
pgstat_shutdown_hook(int code,Datum arg)2897 pgstat_shutdown_hook(int code, Datum arg)
2898 {
2899 	/*
2900 	 * If we got as far as discovering our own database ID, we can report what
2901 	 * we did to the collector.  Otherwise, we'd be sending an invalid
2902 	 * database ID, so forget it.  (This means that accesses to pg_database
2903 	 * during failed backend starts might never get counted.)
2904 	 */
2905 	if (OidIsValid(MyDatabaseId))
2906 		pgstat_report_stat(true);
2907 }
2908 
2909 /* ----------
2910  * pgstat_initialize() -
2911  *
2912  *	Initialize pgstats state, and set up our on-proc-exit hook.
2913  *	Called from InitPostgres and AuxiliaryProcessMain.
2914  *
2915  *	NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2916  * ----------
2917  */
2918 void
pgstat_initialize(void)2919 pgstat_initialize(void)
2920 {
2921 	/*
2922 	 * Initialize prevWalUsage with pgWalUsage so that pgstat_send_wal() can
2923 	 * calculate how much pgWalUsage counters are increased by substracting
2924 	 * prevWalUsage from pgWalUsage.
2925 	 */
2926 	prevWalUsage = pgWalUsage;
2927 
2928 	/* Set up a process-exit hook to clean up */
2929 	on_shmem_exit(pgstat_shutdown_hook, 0);
2930 }
2931 
2932 /* ------------------------------------------------------------
2933  * Local support functions follow
2934  * ------------------------------------------------------------
2935  */
2936 
2937 
2938 /* ----------
2939  * pgstat_setheader() -
2940  *
2941  *		Set common header fields in a statistics message
2942  * ----------
2943  */
2944 static void
pgstat_setheader(PgStat_MsgHdr * hdr,StatMsgType mtype)2945 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
2946 {
2947 	hdr->m_type = mtype;
2948 }
2949 
2950 
2951 /* ----------
2952  * pgstat_send() -
2953  *
2954  *		Send out one statistics message to the collector
2955  * ----------
2956  */
2957 static void
pgstat_send(void * msg,int len)2958 pgstat_send(void *msg, int len)
2959 {
2960 	int			rc;
2961 
2962 	if (pgStatSock == PGINVALID_SOCKET)
2963 		return;
2964 
2965 	((PgStat_MsgHdr *) msg)->m_size = len;
2966 
2967 	/* We'll retry after EINTR, but ignore all other failures */
2968 	do
2969 	{
2970 		rc = send(pgStatSock, msg, len, 0);
2971 	} while (rc < 0 && errno == EINTR);
2972 
2973 #ifdef USE_ASSERT_CHECKING
2974 	/* In debug builds, log send failures ... */
2975 	if (rc < 0)
2976 		elog(LOG, "could not send to statistics collector: %m");
2977 #endif
2978 }
2979 
2980 /* ----------
2981  * pgstat_send_archiver() -
2982  *
2983  *	Tell the collector about the WAL file that we successfully
2984  *	archived or failed to archive.
2985  * ----------
2986  */
2987 void
pgstat_send_archiver(const char * xlog,bool failed)2988 pgstat_send_archiver(const char *xlog, bool failed)
2989 {
2990 	PgStat_MsgArchiver msg;
2991 
2992 	/*
2993 	 * Prepare and send the message
2994 	 */
2995 	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ARCHIVER);
2996 	msg.m_failed = failed;
2997 	strlcpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
2998 	msg.m_timestamp = GetCurrentTimestamp();
2999 	pgstat_send(&msg, sizeof(msg));
3000 }
3001 
3002 /* ----------
3003  * pgstat_send_bgwriter() -
3004  *
3005  *		Send bgwriter statistics to the collector
3006  * ----------
3007  */
3008 void
pgstat_send_bgwriter(void)3009 pgstat_send_bgwriter(void)
3010 {
3011 	/* We assume this initializes to zeroes */
3012 	static const PgStat_MsgBgWriter all_zeroes;
3013 
3014 	/*
3015 	 * This function can be called even if nothing at all has happened. In
3016 	 * this case, avoid sending a completely empty message to the stats
3017 	 * collector.
3018 	 */
3019 	if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
3020 		return;
3021 
3022 	/*
3023 	 * Prepare and send the message
3024 	 */
3025 	pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
3026 	pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
3027 
3028 	/*
3029 	 * Clear out the statistics buffer, so it can be re-used.
3030 	 */
3031 	MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
3032 }
3033 
3034 /* ----------
3035  * pgstat_send_wal() -
3036  *
3037  *	Send WAL statistics to the collector.
3038  *
3039  * If 'force' is not set, WAL stats message is only sent if enough time has
3040  * passed since last one was sent to reach PGSTAT_STAT_INTERVAL.
3041  * ----------
3042  */
3043 void
pgstat_send_wal(bool force)3044 pgstat_send_wal(bool force)
3045 {
3046 	static TimestampTz sendTime = 0;
3047 
3048 	/*
3049 	 * This function can be called even if nothing at all has happened. In
3050 	 * this case, avoid sending a completely empty message to the stats
3051 	 * collector.
3052 	 *
3053 	 * Check wal_records counter to determine whether any WAL activity has
3054 	 * happened since last time. Note that other WalUsage counters don't need
3055 	 * to be checked because they are incremented always together with
3056 	 * wal_records counter.
3057 	 *
3058 	 * m_wal_buffers_full also doesn't need to be checked because it's
3059 	 * incremented only when at least one WAL record is generated (i.e.,
3060 	 * wal_records counter is incremented). But for safely, we assert that
3061 	 * m_wal_buffers_full is always zero when no WAL record is generated
3062 	 *
3063 	 * This function can be called by a process like walwriter that normally
3064 	 * generates no WAL records. To determine whether any WAL activity has
3065 	 * happened at that process since the last time, the numbers of WAL writes
3066 	 * and syncs are also checked.
3067 	 */
3068 	if (pgWalUsage.wal_records == prevWalUsage.wal_records &&
3069 		WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0)
3070 	{
3071 		Assert(WalStats.m_wal_buffers_full == 0);
3072 		return;
3073 	}
3074 
3075 	if (!force)
3076 	{
3077 		TimestampTz now = GetCurrentTimestamp();
3078 
3079 		/*
3080 		 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
3081 		 * msec since we last sent one to avoid overloading the stats
3082 		 * collector.
3083 		 */
3084 		if (!TimestampDifferenceExceeds(sendTime, now, PGSTAT_STAT_INTERVAL))
3085 			return;
3086 		sendTime = now;
3087 	}
3088 
3089 	/*
3090 	 * Set the counters related to generated WAL data if the counters were
3091 	 * updated.
3092 	 */
3093 	if (pgWalUsage.wal_records != prevWalUsage.wal_records)
3094 	{
3095 		WalUsage	walusage;
3096 
3097 		/*
3098 		 * Calculate how much WAL usage counters were increased by
3099 		 * substracting the previous counters from the current ones. Fill the
3100 		 * results in WAL stats message.
3101 		 */
3102 		MemSet(&walusage, 0, sizeof(WalUsage));
3103 		WalUsageAccumDiff(&walusage, &pgWalUsage, &prevWalUsage);
3104 
3105 		WalStats.m_wal_records = walusage.wal_records;
3106 		WalStats.m_wal_fpi = walusage.wal_fpi;
3107 		WalStats.m_wal_bytes = walusage.wal_bytes;
3108 
3109 		/*
3110 		 * Save the current counters for the subsequent calculation of WAL
3111 		 * usage.
3112 		 */
3113 		prevWalUsage = pgWalUsage;
3114 	}
3115 
3116 	/*
3117 	 * Prepare and send the message
3118 	 */
3119 	pgstat_setheader(&WalStats.m_hdr, PGSTAT_MTYPE_WAL);
3120 	pgstat_send(&WalStats, sizeof(WalStats));
3121 
3122 	/*
3123 	 * Clear out the statistics buffer, so it can be re-used.
3124 	 */
3125 	MemSet(&WalStats, 0, sizeof(WalStats));
3126 }
3127 
3128 /* ----------
3129  * pgstat_send_slru() -
3130  *
3131  *		Send SLRU statistics to the collector
3132  * ----------
3133  */
3134 static void
pgstat_send_slru(void)3135 pgstat_send_slru(void)
3136 {
3137 	/* We assume this initializes to zeroes */
3138 	static const PgStat_MsgSLRU all_zeroes;
3139 
3140 	for (int i = 0; i < SLRU_NUM_ELEMENTS; i++)
3141 	{
3142 		/*
3143 		 * This function can be called even if nothing at all has happened. In
3144 		 * this case, avoid sending a completely empty message to the stats
3145 		 * collector.
3146 		 */
3147 		if (memcmp(&SLRUStats[i], &all_zeroes, sizeof(PgStat_MsgSLRU)) == 0)
3148 			continue;
3149 
3150 		/* set the SLRU type before each send */
3151 		SLRUStats[i].m_index = i;
3152 
3153 		/*
3154 		 * Prepare and send the message
3155 		 */
3156 		pgstat_setheader(&SLRUStats[i].m_hdr, PGSTAT_MTYPE_SLRU);
3157 		pgstat_send(&SLRUStats[i], sizeof(PgStat_MsgSLRU));
3158 
3159 		/*
3160 		 * Clear out the statistics buffer, so it can be re-used.
3161 		 */
3162 		MemSet(&SLRUStats[i], 0, sizeof(PgStat_MsgSLRU));
3163 	}
3164 }
3165 
3166 
3167 /* ----------
3168  * PgstatCollectorMain() -
3169  *
3170  *	Start up the statistics collector process.  This is the body of the
3171  *	postmaster child process.
3172  *
3173  *	The argc/argv parameters are valid only in EXEC_BACKEND case.
3174  * ----------
3175  */
3176 NON_EXEC_STATIC void
PgstatCollectorMain(int argc,char * argv[])3177 PgstatCollectorMain(int argc, char *argv[])
3178 {
3179 	int			len;
3180 	PgStat_Msg	msg;
3181 	int			wr;
3182 	WaitEvent	event;
3183 	WaitEventSet *wes;
3184 
3185 	/*
3186 	 * Ignore all signals usually bound to some action in the postmaster,
3187 	 * except SIGHUP and SIGQUIT.  Note we don't need a SIGUSR1 handler to
3188 	 * support latch operations, because we only use a local latch.
3189 	 */
3190 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
3191 	pqsignal(SIGINT, SIG_IGN);
3192 	pqsignal(SIGTERM, SIG_IGN);
3193 	pqsignal(SIGQUIT, SignalHandlerForShutdownRequest);
3194 	pqsignal(SIGALRM, SIG_IGN);
3195 	pqsignal(SIGPIPE, SIG_IGN);
3196 	pqsignal(SIGUSR1, SIG_IGN);
3197 	pqsignal(SIGUSR2, SIG_IGN);
3198 	/* Reset some signals that are accepted by postmaster but not here */
3199 	pqsignal(SIGCHLD, SIG_DFL);
3200 	PG_SETMASK(&UnBlockSig);
3201 
3202 	MyBackendType = B_STATS_COLLECTOR;
3203 	init_ps_display(NULL);
3204 
3205 	/*
3206 	 * Read in existing stats files or initialize the stats to zero.
3207 	 */
3208 	pgStatRunningInCollector = true;
3209 	pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
3210 
3211 	/* Prepare to wait for our latch or data in our socket. */
3212 	wes = CreateWaitEventSet(CurrentMemoryContext, 3);
3213 	AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
3214 	AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
3215 	AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
3216 
3217 	/*
3218 	 * Loop to process messages until we get SIGQUIT or detect ungraceful
3219 	 * death of our parent postmaster.
3220 	 *
3221 	 * For performance reasons, we don't want to do ResetLatch/WaitLatch after
3222 	 * every message; instead, do that only after a recv() fails to obtain a
3223 	 * message.  (This effectively means that if backends are sending us stuff
3224 	 * like mad, we won't notice postmaster death until things slack off a
3225 	 * bit; which seems fine.)	To do that, we have an inner loop that
3226 	 * iterates as long as recv() succeeds.  We do check ConfigReloadPending
3227 	 * inside the inner loop, which means that such interrupts will get
3228 	 * serviced but the latch won't get cleared until next time there is a
3229 	 * break in the action.
3230 	 */
3231 	for (;;)
3232 	{
3233 		/* Clear any already-pending wakeups */
3234 		ResetLatch(MyLatch);
3235 
3236 		/*
3237 		 * Quit if we get SIGQUIT from the postmaster.
3238 		 */
3239 		if (ShutdownRequestPending)
3240 			break;
3241 
3242 		/*
3243 		 * Inner loop iterates as long as we keep getting messages, or until
3244 		 * ShutdownRequestPending becomes set.
3245 		 */
3246 		while (!ShutdownRequestPending)
3247 		{
3248 			/*
3249 			 * Reload configuration if we got SIGHUP from the postmaster.
3250 			 */
3251 			if (ConfigReloadPending)
3252 			{
3253 				ConfigReloadPending = false;
3254 				ProcessConfigFile(PGC_SIGHUP);
3255 			}
3256 
3257 			/*
3258 			 * Write the stats file(s) if a new request has arrived that is
3259 			 * not satisfied by existing file(s).
3260 			 */
3261 			if (pgstat_write_statsfile_needed())
3262 				pgstat_write_statsfiles(false, false);
3263 
3264 			/*
3265 			 * Try to receive and process a message.  This will not block,
3266 			 * since the socket is set to non-blocking mode.
3267 			 *
3268 			 * XXX On Windows, we have to force pgwin32_recv to cooperate,
3269 			 * despite the previous use of pg_set_noblock() on the socket.
3270 			 * This is extremely broken and should be fixed someday.
3271 			 */
3272 #ifdef WIN32
3273 			pgwin32_noblock = 1;
3274 #endif
3275 
3276 			len = recv(pgStatSock, (char *) &msg,
3277 					   sizeof(PgStat_Msg), 0);
3278 
3279 #ifdef WIN32
3280 			pgwin32_noblock = 0;
3281 #endif
3282 
3283 			if (len < 0)
3284 			{
3285 				if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
3286 					break;		/* out of inner loop */
3287 				ereport(ERROR,
3288 						(errcode_for_socket_access(),
3289 						 errmsg("could not read statistics message: %m")));
3290 			}
3291 
3292 			/*
3293 			 * We ignore messages that are smaller than our common header
3294 			 */
3295 			if (len < sizeof(PgStat_MsgHdr))
3296 				continue;
3297 
3298 			/*
3299 			 * The received length must match the length in the header
3300 			 */
3301 			if (msg.msg_hdr.m_size != len)
3302 				continue;
3303 
3304 			/*
3305 			 * O.K. - we accept this message.  Process it.
3306 			 */
3307 			switch (msg.msg_hdr.m_type)
3308 			{
3309 				case PGSTAT_MTYPE_DUMMY:
3310 					break;
3311 
3312 				case PGSTAT_MTYPE_INQUIRY:
3313 					pgstat_recv_inquiry(&msg.msg_inquiry, len);
3314 					break;
3315 
3316 				case PGSTAT_MTYPE_TABSTAT:
3317 					pgstat_recv_tabstat(&msg.msg_tabstat, len);
3318 					break;
3319 
3320 				case PGSTAT_MTYPE_TABPURGE:
3321 					pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
3322 					break;
3323 
3324 				case PGSTAT_MTYPE_DROPDB:
3325 					pgstat_recv_dropdb(&msg.msg_dropdb, len);
3326 					break;
3327 
3328 				case PGSTAT_MTYPE_RESETCOUNTER:
3329 					pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
3330 					break;
3331 
3332 				case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
3333 					pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
3334 												   len);
3335 					break;
3336 
3337 				case PGSTAT_MTYPE_RESETSINGLECOUNTER:
3338 					pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
3339 												   len);
3340 					break;
3341 
3342 				case PGSTAT_MTYPE_RESETSLRUCOUNTER:
3343 					pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
3344 												 len);
3345 					break;
3346 
3347 				case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
3348 					pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
3349 													 len);
3350 					break;
3351 
3352 				case PGSTAT_MTYPE_AUTOVAC_START:
3353 					pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
3354 					break;
3355 
3356 				case PGSTAT_MTYPE_VACUUM:
3357 					pgstat_recv_vacuum(&msg.msg_vacuum, len);
3358 					break;
3359 
3360 				case PGSTAT_MTYPE_ANALYZE:
3361 					pgstat_recv_analyze(&msg.msg_analyze, len);
3362 					break;
3363 
3364 				case PGSTAT_MTYPE_ARCHIVER:
3365 					pgstat_recv_archiver(&msg.msg_archiver, len);
3366 					break;
3367 
3368 				case PGSTAT_MTYPE_BGWRITER:
3369 					pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
3370 					break;
3371 
3372 				case PGSTAT_MTYPE_WAL:
3373 					pgstat_recv_wal(&msg.msg_wal, len);
3374 					break;
3375 
3376 				case PGSTAT_MTYPE_SLRU:
3377 					pgstat_recv_slru(&msg.msg_slru, len);
3378 					break;
3379 
3380 				case PGSTAT_MTYPE_FUNCSTAT:
3381 					pgstat_recv_funcstat(&msg.msg_funcstat, len);
3382 					break;
3383 
3384 				case PGSTAT_MTYPE_FUNCPURGE:
3385 					pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
3386 					break;
3387 
3388 				case PGSTAT_MTYPE_RECOVERYCONFLICT:
3389 					pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
3390 												 len);
3391 					break;
3392 
3393 				case PGSTAT_MTYPE_DEADLOCK:
3394 					pgstat_recv_deadlock(&msg.msg_deadlock, len);
3395 					break;
3396 
3397 				case PGSTAT_MTYPE_TEMPFILE:
3398 					pgstat_recv_tempfile(&msg.msg_tempfile, len);
3399 					break;
3400 
3401 				case PGSTAT_MTYPE_CHECKSUMFAILURE:
3402 					pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
3403 												 len);
3404 					break;
3405 
3406 				case PGSTAT_MTYPE_REPLSLOT:
3407 					pgstat_recv_replslot(&msg.msg_replslot, len);
3408 					break;
3409 
3410 				case PGSTAT_MTYPE_CONNECT:
3411 					pgstat_recv_connect(&msg.msg_connect, len);
3412 					break;
3413 
3414 				case PGSTAT_MTYPE_DISCONNECT:
3415 					pgstat_recv_disconnect(&msg.msg_disconnect, len);
3416 					break;
3417 
3418 				default:
3419 					break;
3420 			}
3421 		}						/* end of inner message-processing loop */
3422 
3423 		/* Sleep until there's something to do */
3424 #ifndef WIN32
3425 		wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
3426 #else
3427 
3428 		/*
3429 		 * Windows, at least in its Windows Server 2003 R2 incarnation,
3430 		 * sometimes loses FD_READ events.  Waking up and retrying the recv()
3431 		 * fixes that, so don't sleep indefinitely.  This is a crock of the
3432 		 * first water, but until somebody wants to debug exactly what's
3433 		 * happening there, this is the best we can do.  The two-second
3434 		 * timeout matches our pre-9.2 behavior, and needs to be short enough
3435 		 * to not provoke "using stale statistics" complaints from
3436 		 * backend_read_statsfile.
3437 		 */
3438 		wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1,
3439 							  WAIT_EVENT_PGSTAT_MAIN);
3440 #endif
3441 
3442 		/*
3443 		 * Emergency bailout if postmaster has died.  This is to avoid the
3444 		 * necessity for manual cleanup of all postmaster children.
3445 		 */
3446 		if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
3447 			break;
3448 	}							/* end of outer loop */
3449 
3450 	/*
3451 	 * Save the final stats to reuse at next startup.
3452 	 */
3453 	pgstat_write_statsfiles(true, true);
3454 
3455 	FreeWaitEventSet(wes);
3456 
3457 	exit(0);
3458 }
3459 
3460 /*
3461  * Subroutine to clear stats in a database entry
3462  *
3463  * Tables and functions hashes are initialized to empty.
3464  */
3465 static void
reset_dbentry_counters(PgStat_StatDBEntry * dbentry)3466 reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
3467 {
3468 	HASHCTL		hash_ctl;
3469 
3470 	dbentry->n_xact_commit = 0;
3471 	dbentry->n_xact_rollback = 0;
3472 	dbentry->n_blocks_fetched = 0;
3473 	dbentry->n_blocks_hit = 0;
3474 	dbentry->n_tuples_returned = 0;
3475 	dbentry->n_tuples_fetched = 0;
3476 	dbentry->n_tuples_inserted = 0;
3477 	dbentry->n_tuples_updated = 0;
3478 	dbentry->n_tuples_deleted = 0;
3479 	dbentry->last_autovac_time = 0;
3480 	dbentry->n_conflict_tablespace = 0;
3481 	dbentry->n_conflict_lock = 0;
3482 	dbentry->n_conflict_snapshot = 0;
3483 	dbentry->n_conflict_bufferpin = 0;
3484 	dbentry->n_conflict_startup_deadlock = 0;
3485 	dbentry->n_temp_files = 0;
3486 	dbentry->n_temp_bytes = 0;
3487 	dbentry->n_deadlocks = 0;
3488 	dbentry->n_checksum_failures = 0;
3489 	dbentry->last_checksum_failure = 0;
3490 	dbentry->n_block_read_time = 0;
3491 	dbentry->n_block_write_time = 0;
3492 	dbentry->n_sessions = 0;
3493 	dbentry->total_session_time = 0;
3494 	dbentry->total_active_time = 0;
3495 	dbentry->total_idle_in_xact_time = 0;
3496 	dbentry->n_sessions_abandoned = 0;
3497 	dbentry->n_sessions_fatal = 0;
3498 	dbentry->n_sessions_killed = 0;
3499 
3500 	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
3501 	dbentry->stats_timestamp = 0;
3502 
3503 	hash_ctl.keysize = sizeof(Oid);
3504 	hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3505 	dbentry->tables = hash_create("Per-database table",
3506 								  PGSTAT_TAB_HASH_SIZE,
3507 								  &hash_ctl,
3508 								  HASH_ELEM | HASH_BLOBS);
3509 
3510 	hash_ctl.keysize = sizeof(Oid);
3511 	hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
3512 	dbentry->functions = hash_create("Per-database function",
3513 									 PGSTAT_FUNCTION_HASH_SIZE,
3514 									 &hash_ctl,
3515 									 HASH_ELEM | HASH_BLOBS);
3516 }
3517 
3518 /*
3519  * Lookup the hash table entry for the specified database. If no hash
3520  * table entry exists, initialize it, if the create parameter is true.
3521  * Else, return NULL.
3522  */
3523 static PgStat_StatDBEntry *
pgstat_get_db_entry(Oid databaseid,bool create)3524 pgstat_get_db_entry(Oid databaseid, bool create)
3525 {
3526 	PgStat_StatDBEntry *result;
3527 	bool		found;
3528 	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
3529 
3530 	/* Lookup or create the hash table entry for this database */
3531 	result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3532 												&databaseid,
3533 												action, &found);
3534 
3535 	if (!create && !found)
3536 		return NULL;
3537 
3538 	/*
3539 	 * If not found, initialize the new one.  This creates empty hash tables
3540 	 * for tables and functions, too.
3541 	 */
3542 	if (!found)
3543 		reset_dbentry_counters(result);
3544 
3545 	return result;
3546 }
3547 
3548 
3549 /*
3550  * Lookup the hash table entry for the specified table. If no hash
3551  * table entry exists, initialize it, if the create parameter is true.
3552  * Else, return NULL.
3553  */
3554 static PgStat_StatTabEntry *
pgstat_get_tab_entry(PgStat_StatDBEntry * dbentry,Oid tableoid,bool create)3555 pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
3556 {
3557 	PgStat_StatTabEntry *result;
3558 	bool		found;
3559 	HASHACTION	action = (create ? HASH_ENTER : HASH_FIND);
3560 
3561 	/* Lookup or create the hash table entry for this table */
3562 	result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3563 												 &tableoid,
3564 												 action, &found);
3565 
3566 	if (!create && !found)
3567 		return NULL;
3568 
3569 	/* If not found, initialize the new one. */
3570 	if (!found)
3571 	{
3572 		result->numscans = 0;
3573 		result->tuples_returned = 0;
3574 		result->tuples_fetched = 0;
3575 		result->tuples_inserted = 0;
3576 		result->tuples_updated = 0;
3577 		result->tuples_deleted = 0;
3578 		result->tuples_hot_updated = 0;
3579 		result->n_live_tuples = 0;
3580 		result->n_dead_tuples = 0;
3581 		result->changes_since_analyze = 0;
3582 		result->inserts_since_vacuum = 0;
3583 		result->blocks_fetched = 0;
3584 		result->blocks_hit = 0;
3585 		result->vacuum_timestamp = 0;
3586 		result->vacuum_count = 0;
3587 		result->autovac_vacuum_timestamp = 0;
3588 		result->autovac_vacuum_count = 0;
3589 		result->analyze_timestamp = 0;
3590 		result->analyze_count = 0;
3591 		result->autovac_analyze_timestamp = 0;
3592 		result->autovac_analyze_count = 0;
3593 	}
3594 
3595 	return result;
3596 }
3597 
3598 
3599 /* ----------
3600  * pgstat_write_statsfiles() -
3601  *		Write the global statistics file, as well as requested DB files.
3602  *
3603  *	'permanent' specifies writing to the permanent files not temporary ones.
3604  *	When true (happens only when the collector is shutting down), also remove
3605  *	the temporary files so that backends starting up under a new postmaster
3606  *	can't read old data before the new collector is ready.
3607  *
3608  *	When 'allDbs' is false, only the requested databases (listed in
3609  *	pending_write_requests) will be written; otherwise, all databases
3610  *	will be written.
3611  * ----------
3612  */
3613 static void
pgstat_write_statsfiles(bool permanent,bool allDbs)3614 pgstat_write_statsfiles(bool permanent, bool allDbs)
3615 {
3616 	HASH_SEQ_STATUS hstat;
3617 	PgStat_StatDBEntry *dbentry;
3618 	FILE	   *fpout;
3619 	int32		format_id;
3620 	const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
3621 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3622 	int			rc;
3623 
3624 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
3625 
3626 	/*
3627 	 * Open the statistics temp file to write out the current values.
3628 	 */
3629 	fpout = AllocateFile(tmpfile, PG_BINARY_W);
3630 	if (fpout == NULL)
3631 	{
3632 		ereport(LOG,
3633 				(errcode_for_file_access(),
3634 				 errmsg("could not open temporary statistics file \"%s\": %m",
3635 						tmpfile)));
3636 		return;
3637 	}
3638 
3639 	/*
3640 	 * Set the timestamp of the stats file.
3641 	 */
3642 	globalStats.stats_timestamp = GetCurrentTimestamp();
3643 
3644 	/*
3645 	 * Write the file header --- currently just a format ID.
3646 	 */
3647 	format_id = PGSTAT_FILE_FORMAT_ID;
3648 	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3649 	(void) rc;					/* we'll check for error with ferror */
3650 
3651 	/*
3652 	 * Write global stats struct
3653 	 */
3654 	rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
3655 	(void) rc;					/* we'll check for error with ferror */
3656 
3657 	/*
3658 	 * Write archiver stats struct
3659 	 */
3660 	rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
3661 	(void) rc;					/* we'll check for error with ferror */
3662 
3663 	/*
3664 	 * Write WAL stats struct
3665 	 */
3666 	rc = fwrite(&walStats, sizeof(walStats), 1, fpout);
3667 	(void) rc;					/* we'll check for error with ferror */
3668 
3669 	/*
3670 	 * Write SLRU stats struct
3671 	 */
3672 	rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
3673 	(void) rc;					/* we'll check for error with ferror */
3674 
3675 	/*
3676 	 * Walk through the database table.
3677 	 */
3678 	hash_seq_init(&hstat, pgStatDBHash);
3679 	while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
3680 	{
3681 		/*
3682 		 * Write out the table and function stats for this DB into the
3683 		 * appropriate per-DB stat file, if required.
3684 		 */
3685 		if (allDbs || pgstat_db_requested(dbentry->databaseid))
3686 		{
3687 			/* Make DB's timestamp consistent with the global stats */
3688 			dbentry->stats_timestamp = globalStats.stats_timestamp;
3689 
3690 			pgstat_write_db_statsfile(dbentry, permanent);
3691 		}
3692 
3693 		/*
3694 		 * Write out the DB entry. We don't write the tables or functions
3695 		 * pointers, since they're of no use to any other process.
3696 		 */
3697 		fputc('D', fpout);
3698 		rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
3699 		(void) rc;				/* we'll check for error with ferror */
3700 	}
3701 
3702 	/*
3703 	 * Write replication slot stats struct
3704 	 */
3705 	if (replSlotStatHash)
3706 	{
3707 		PgStat_StatReplSlotEntry *slotent;
3708 
3709 		hash_seq_init(&hstat, replSlotStatHash);
3710 		while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
3711 		{
3712 			fputc('R', fpout);
3713 			rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
3714 			(void) rc;			/* we'll check for error with ferror */
3715 		}
3716 	}
3717 
3718 	/*
3719 	 * No more output to be done. Close the temp file and replace the old
3720 	 * pgstat.stat with it.  The ferror() check replaces testing for error
3721 	 * after each individual fputc or fwrite above.
3722 	 */
3723 	fputc('E', fpout);
3724 
3725 	if (ferror(fpout))
3726 	{
3727 		ereport(LOG,
3728 				(errcode_for_file_access(),
3729 				 errmsg("could not write temporary statistics file \"%s\": %m",
3730 						tmpfile)));
3731 		FreeFile(fpout);
3732 		unlink(tmpfile);
3733 	}
3734 	else if (FreeFile(fpout) < 0)
3735 	{
3736 		ereport(LOG,
3737 				(errcode_for_file_access(),
3738 				 errmsg("could not close temporary statistics file \"%s\": %m",
3739 						tmpfile)));
3740 		unlink(tmpfile);
3741 	}
3742 	else if (rename(tmpfile, statfile) < 0)
3743 	{
3744 		ereport(LOG,
3745 				(errcode_for_file_access(),
3746 				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3747 						tmpfile, statfile)));
3748 		unlink(tmpfile);
3749 	}
3750 
3751 	if (permanent)
3752 		unlink(pgstat_stat_filename);
3753 
3754 	/*
3755 	 * Now throw away the list of requests.  Note that requests sent after we
3756 	 * started the write are still waiting on the network socket.
3757 	 */
3758 	list_free(pending_write_requests);
3759 	pending_write_requests = NIL;
3760 }
3761 
3762 /*
3763  * return the filename for a DB stat file; filename is the output buffer,
3764  * of length len.
3765  */
3766 static void
get_dbstat_filename(bool permanent,bool tempname,Oid databaseid,char * filename,int len)3767 get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
3768 					char *filename, int len)
3769 {
3770 	int			printed;
3771 
3772 	/* NB -- pgstat_reset_remove_files knows about the pattern this uses */
3773 	printed = snprintf(filename, len, "%s/db_%u.%s",
3774 					   permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
3775 					   pgstat_stat_directory,
3776 					   databaseid,
3777 					   tempname ? "tmp" : "stat");
3778 	if (printed >= len)
3779 		elog(ERROR, "overlength pgstat path");
3780 }
3781 
3782 /* ----------
3783  * pgstat_write_db_statsfile() -
3784  *		Write the stat file for a single database.
3785  *
3786  *	If writing to the permanent file (happens when the collector is
3787  *	shutting down only), remove the temporary file so that backends
3788  *	starting up under a new postmaster can't read the old data before
3789  *	the new collector is ready.
3790  * ----------
3791  */
3792 static void
pgstat_write_db_statsfile(PgStat_StatDBEntry * dbentry,bool permanent)3793 pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
3794 {
3795 	HASH_SEQ_STATUS tstat;
3796 	HASH_SEQ_STATUS fstat;
3797 	PgStat_StatTabEntry *tabentry;
3798 	PgStat_StatFuncEntry *funcentry;
3799 	FILE	   *fpout;
3800 	int32		format_id;
3801 	Oid			dbid = dbentry->databaseid;
3802 	int			rc;
3803 	char		tmpfile[MAXPGPATH];
3804 	char		statfile[MAXPGPATH];
3805 
3806 	get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
3807 	get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
3808 
3809 	elog(DEBUG2, "writing stats file \"%s\"", statfile);
3810 
3811 	/*
3812 	 * Open the statistics temp file to write out the current values.
3813 	 */
3814 	fpout = AllocateFile(tmpfile, PG_BINARY_W);
3815 	if (fpout == NULL)
3816 	{
3817 		ereport(LOG,
3818 				(errcode_for_file_access(),
3819 				 errmsg("could not open temporary statistics file \"%s\": %m",
3820 						tmpfile)));
3821 		return;
3822 	}
3823 
3824 	/*
3825 	 * Write the file header --- currently just a format ID.
3826 	 */
3827 	format_id = PGSTAT_FILE_FORMAT_ID;
3828 	rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3829 	(void) rc;					/* we'll check for error with ferror */
3830 
3831 	/*
3832 	 * Walk through the database's access stats per table.
3833 	 */
3834 	hash_seq_init(&tstat, dbentry->tables);
3835 	while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
3836 	{
3837 		fputc('T', fpout);
3838 		rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
3839 		(void) rc;				/* we'll check for error with ferror */
3840 	}
3841 
3842 	/*
3843 	 * Walk through the database's function stats table.
3844 	 */
3845 	hash_seq_init(&fstat, dbentry->functions);
3846 	while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
3847 	{
3848 		fputc('F', fpout);
3849 		rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
3850 		(void) rc;				/* we'll check for error with ferror */
3851 	}
3852 
3853 	/*
3854 	 * No more output to be done. Close the temp file and replace the old
3855 	 * pgstat.stat with it.  The ferror() check replaces testing for error
3856 	 * after each individual fputc or fwrite above.
3857 	 */
3858 	fputc('E', fpout);
3859 
3860 	if (ferror(fpout))
3861 	{
3862 		ereport(LOG,
3863 				(errcode_for_file_access(),
3864 				 errmsg("could not write temporary statistics file \"%s\": %m",
3865 						tmpfile)));
3866 		FreeFile(fpout);
3867 		unlink(tmpfile);
3868 	}
3869 	else if (FreeFile(fpout) < 0)
3870 	{
3871 		ereport(LOG,
3872 				(errcode_for_file_access(),
3873 				 errmsg("could not close temporary statistics file \"%s\": %m",
3874 						tmpfile)));
3875 		unlink(tmpfile);
3876 	}
3877 	else if (rename(tmpfile, statfile) < 0)
3878 	{
3879 		ereport(LOG,
3880 				(errcode_for_file_access(),
3881 				 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3882 						tmpfile, statfile)));
3883 		unlink(tmpfile);
3884 	}
3885 
3886 	if (permanent)
3887 	{
3888 		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
3889 
3890 		elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
3891 		unlink(statfile);
3892 	}
3893 }
3894 
3895 /* ----------
3896  * pgstat_read_statsfiles() -
3897  *
3898  *	Reads in some existing statistics collector files and returns the
3899  *	databases hash table that is the top level of the data.
3900  *
3901  *	If 'onlydb' is not InvalidOid, it means we only want data for that DB
3902  *	plus the shared catalogs ("DB 0").  We'll still populate the DB hash
3903  *	table for all databases, but we don't bother even creating table/function
3904  *	hash tables for other databases.
3905  *
3906  *	'permanent' specifies reading from the permanent files not temporary ones.
3907  *	When true (happens only when the collector is starting up), remove the
3908  *	files after reading; the in-memory status is now authoritative, and the
3909  *	files would be out of date in case somebody else reads them.
3910  *
3911  *	If a 'deep' read is requested, table/function stats are read, otherwise
3912  *	the table/function hash tables remain empty.
3913  * ----------
3914  */
3915 static HTAB *
pgstat_read_statsfiles(Oid onlydb,bool permanent,bool deep)3916 pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
3917 {
3918 	PgStat_StatDBEntry *dbentry;
3919 	PgStat_StatDBEntry dbbuf;
3920 	HASHCTL		hash_ctl;
3921 	HTAB	   *dbhash;
3922 	FILE	   *fpin;
3923 	int32		format_id;
3924 	bool		found;
3925 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3926 	int			i;
3927 
3928 	/*
3929 	 * The tables will live in pgStatLocalContext.
3930 	 */
3931 	pgstat_setup_memcxt();
3932 
3933 	/*
3934 	 * Create the DB hashtable
3935 	 */
3936 	hash_ctl.keysize = sizeof(Oid);
3937 	hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
3938 	hash_ctl.hcxt = pgStatLocalContext;
3939 	dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
3940 						 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3941 
3942 	/*
3943 	 * Clear out global, archiver, WAL and SLRU statistics so they start from
3944 	 * zero in case we can't load an existing statsfile.
3945 	 */
3946 	memset(&globalStats, 0, sizeof(globalStats));
3947 	memset(&archiverStats, 0, sizeof(archiverStats));
3948 	memset(&walStats, 0, sizeof(walStats));
3949 	memset(&slruStats, 0, sizeof(slruStats));
3950 
3951 	/*
3952 	 * Set the current timestamp (will be kept only in case we can't load an
3953 	 * existing statsfile).
3954 	 */
3955 	globalStats.stat_reset_timestamp = GetCurrentTimestamp();
3956 	archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
3957 	walStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
3958 
3959 	/*
3960 	 * Set the same reset timestamp for all SLRU items too.
3961 	 */
3962 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
3963 		slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
3964 
3965 	/*
3966 	 * Try to open the stats file. If it doesn't exist, the backends simply
3967 	 * return zero for anything and the collector simply starts from scratch
3968 	 * with empty counters.
3969 	 *
3970 	 * ENOENT is a possibility if the stats collector is not running or has
3971 	 * not yet written the stats file the first time.  Any other failure
3972 	 * condition is suspicious.
3973 	 */
3974 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
3975 	{
3976 		if (errno != ENOENT)
3977 			ereport(pgStatRunningInCollector ? LOG : WARNING,
3978 					(errcode_for_file_access(),
3979 					 errmsg("could not open statistics file \"%s\": %m",
3980 							statfile)));
3981 		return dbhash;
3982 	}
3983 
3984 	/*
3985 	 * Verify it's of the expected format.
3986 	 */
3987 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
3988 		format_id != PGSTAT_FILE_FORMAT_ID)
3989 	{
3990 		ereport(pgStatRunningInCollector ? LOG : WARNING,
3991 				(errmsg("corrupted statistics file \"%s\"", statfile)));
3992 		goto done;
3993 	}
3994 
3995 	/*
3996 	 * Read global stats struct
3997 	 */
3998 	if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
3999 	{
4000 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4001 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4002 		memset(&globalStats, 0, sizeof(globalStats));
4003 		goto done;
4004 	}
4005 
4006 	/*
4007 	 * In the collector, disregard the timestamp we read from the permanent
4008 	 * stats file; we should be willing to write a temp stats file immediately
4009 	 * upon the first request from any backend.  This only matters if the old
4010 	 * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
4011 	 * an unusual scenario.
4012 	 */
4013 	if (pgStatRunningInCollector)
4014 		globalStats.stats_timestamp = 0;
4015 
4016 	/*
4017 	 * Read archiver stats struct
4018 	 */
4019 	if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
4020 	{
4021 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4022 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4023 		memset(&archiverStats, 0, sizeof(archiverStats));
4024 		goto done;
4025 	}
4026 
4027 	/*
4028 	 * Read WAL stats struct
4029 	 */
4030 	if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats))
4031 	{
4032 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4033 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4034 		memset(&walStats, 0, sizeof(walStats));
4035 		goto done;
4036 	}
4037 
4038 	/*
4039 	 * Read SLRU stats struct
4040 	 */
4041 	if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats))
4042 	{
4043 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4044 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4045 		memset(&slruStats, 0, sizeof(slruStats));
4046 		goto done;
4047 	}
4048 
4049 	/*
4050 	 * We found an existing collector stats file. Read it and put all the
4051 	 * hashtable entries into place.
4052 	 */
4053 	for (;;)
4054 	{
4055 		switch (fgetc(fpin))
4056 		{
4057 				/*
4058 				 * 'D'	A PgStat_StatDBEntry struct describing a database
4059 				 * follows.
4060 				 */
4061 			case 'D':
4062 				if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
4063 						  fpin) != offsetof(PgStat_StatDBEntry, tables))
4064 				{
4065 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4066 							(errmsg("corrupted statistics file \"%s\"",
4067 									statfile)));
4068 					goto done;
4069 				}
4070 
4071 				/*
4072 				 * Add to the DB hash
4073 				 */
4074 				dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
4075 															 (void *) &dbbuf.databaseid,
4076 															 HASH_ENTER,
4077 															 &found);
4078 				if (found)
4079 				{
4080 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4081 							(errmsg("corrupted statistics file \"%s\"",
4082 									statfile)));
4083 					goto done;
4084 				}
4085 
4086 				memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
4087 				dbentry->tables = NULL;
4088 				dbentry->functions = NULL;
4089 
4090 				/*
4091 				 * In the collector, disregard the timestamp we read from the
4092 				 * permanent stats file; we should be willing to write a temp
4093 				 * stats file immediately upon the first request from any
4094 				 * backend.
4095 				 */
4096 				if (pgStatRunningInCollector)
4097 					dbentry->stats_timestamp = 0;
4098 
4099 				/*
4100 				 * Don't create tables/functions hashtables for uninteresting
4101 				 * databases.
4102 				 */
4103 				if (onlydb != InvalidOid)
4104 				{
4105 					if (dbbuf.databaseid != onlydb &&
4106 						dbbuf.databaseid != InvalidOid)
4107 						break;
4108 				}
4109 
4110 				hash_ctl.keysize = sizeof(Oid);
4111 				hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
4112 				hash_ctl.hcxt = pgStatLocalContext;
4113 				dbentry->tables = hash_create("Per-database table",
4114 											  PGSTAT_TAB_HASH_SIZE,
4115 											  &hash_ctl,
4116 											  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4117 
4118 				hash_ctl.keysize = sizeof(Oid);
4119 				hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
4120 				hash_ctl.hcxt = pgStatLocalContext;
4121 				dbentry->functions = hash_create("Per-database function",
4122 												 PGSTAT_FUNCTION_HASH_SIZE,
4123 												 &hash_ctl,
4124 												 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4125 
4126 				/*
4127 				 * If requested, read the data from the database-specific
4128 				 * file.  Otherwise we just leave the hashtables empty.
4129 				 */
4130 				if (deep)
4131 					pgstat_read_db_statsfile(dbentry->databaseid,
4132 											 dbentry->tables,
4133 											 dbentry->functions,
4134 											 permanent);
4135 
4136 				break;
4137 
4138 				/*
4139 				 * 'R'	A PgStat_StatReplSlotEntry struct describing a
4140 				 * replication slot follows.
4141 				 */
4142 			case 'R':
4143 				{
4144 					PgStat_StatReplSlotEntry slotbuf;
4145 					PgStat_StatReplSlotEntry *slotent;
4146 
4147 					if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4148 						!= sizeof(PgStat_StatReplSlotEntry))
4149 					{
4150 						ereport(pgStatRunningInCollector ? LOG : WARNING,
4151 								(errmsg("corrupted statistics file \"%s\"",
4152 										statfile)));
4153 						goto done;
4154 					}
4155 
4156 					/* Create hash table if we don't have it already. */
4157 					if (replSlotStatHash == NULL)
4158 					{
4159 						HASHCTL		hash_ctl;
4160 
4161 						hash_ctl.keysize = sizeof(NameData);
4162 						hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
4163 						hash_ctl.hcxt = pgStatLocalContext;
4164 						replSlotStatHash = hash_create("Replication slots hash",
4165 													   PGSTAT_REPLSLOT_HASH_SIZE,
4166 													   &hash_ctl,
4167 													   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4168 					}
4169 
4170 					slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
4171 																	   (void *) &slotbuf.slotname,
4172 																	   HASH_ENTER, NULL);
4173 					memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
4174 					break;
4175 				}
4176 
4177 			case 'E':
4178 				goto done;
4179 
4180 			default:
4181 				ereport(pgStatRunningInCollector ? LOG : WARNING,
4182 						(errmsg("corrupted statistics file \"%s\"",
4183 								statfile)));
4184 				goto done;
4185 		}
4186 	}
4187 
4188 done:
4189 	FreeFile(fpin);
4190 
4191 	/* If requested to read the permanent file, also get rid of it. */
4192 	if (permanent)
4193 	{
4194 		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4195 		unlink(statfile);
4196 	}
4197 
4198 	return dbhash;
4199 }
4200 
4201 
4202 /* ----------
4203  * pgstat_read_db_statsfile() -
4204  *
4205  *	Reads in the existing statistics collector file for the given database,
4206  *	filling the passed-in tables and functions hash tables.
4207  *
4208  *	As in pgstat_read_statsfiles, if the permanent file is requested, it is
4209  *	removed after reading.
4210  *
4211  *	Note: this code has the ability to skip storing per-table or per-function
4212  *	data, if NULL is passed for the corresponding hashtable.  That's not used
4213  *	at the moment though.
4214  * ----------
4215  */
4216 static void
pgstat_read_db_statsfile(Oid databaseid,HTAB * tabhash,HTAB * funchash,bool permanent)4217 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
4218 						 bool permanent)
4219 {
4220 	PgStat_StatTabEntry *tabentry;
4221 	PgStat_StatTabEntry tabbuf;
4222 	PgStat_StatFuncEntry funcbuf;
4223 	PgStat_StatFuncEntry *funcentry;
4224 	FILE	   *fpin;
4225 	int32		format_id;
4226 	bool		found;
4227 	char		statfile[MAXPGPATH];
4228 
4229 	get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
4230 
4231 	/*
4232 	 * Try to open the stats file. If it doesn't exist, the backends simply
4233 	 * return zero for anything and the collector simply starts from scratch
4234 	 * with empty counters.
4235 	 *
4236 	 * ENOENT is a possibility if the stats collector is not running or has
4237 	 * not yet written the stats file the first time.  Any other failure
4238 	 * condition is suspicious.
4239 	 */
4240 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4241 	{
4242 		if (errno != ENOENT)
4243 			ereport(pgStatRunningInCollector ? LOG : WARNING,
4244 					(errcode_for_file_access(),
4245 					 errmsg("could not open statistics file \"%s\": %m",
4246 							statfile)));
4247 		return;
4248 	}
4249 
4250 	/*
4251 	 * Verify it's of the expected format.
4252 	 */
4253 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4254 		format_id != PGSTAT_FILE_FORMAT_ID)
4255 	{
4256 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4257 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4258 		goto done;
4259 	}
4260 
4261 	/*
4262 	 * We found an existing collector stats file. Read it and put all the
4263 	 * hashtable entries into place.
4264 	 */
4265 	for (;;)
4266 	{
4267 		switch (fgetc(fpin))
4268 		{
4269 				/*
4270 				 * 'T'	A PgStat_StatTabEntry follows.
4271 				 */
4272 			case 'T':
4273 				if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
4274 						  fpin) != sizeof(PgStat_StatTabEntry))
4275 				{
4276 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4277 							(errmsg("corrupted statistics file \"%s\"",
4278 									statfile)));
4279 					goto done;
4280 				}
4281 
4282 				/*
4283 				 * Skip if table data not wanted.
4284 				 */
4285 				if (tabhash == NULL)
4286 					break;
4287 
4288 				tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
4289 															   (void *) &tabbuf.tableid,
4290 															   HASH_ENTER, &found);
4291 
4292 				if (found)
4293 				{
4294 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4295 							(errmsg("corrupted statistics file \"%s\"",
4296 									statfile)));
4297 					goto done;
4298 				}
4299 
4300 				memcpy(tabentry, &tabbuf, sizeof(tabbuf));
4301 				break;
4302 
4303 				/*
4304 				 * 'F'	A PgStat_StatFuncEntry follows.
4305 				 */
4306 			case 'F':
4307 				if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
4308 						  fpin) != sizeof(PgStat_StatFuncEntry))
4309 				{
4310 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4311 							(errmsg("corrupted statistics file \"%s\"",
4312 									statfile)));
4313 					goto done;
4314 				}
4315 
4316 				/*
4317 				 * Skip if function data not wanted.
4318 				 */
4319 				if (funchash == NULL)
4320 					break;
4321 
4322 				funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
4323 																 (void *) &funcbuf.functionid,
4324 																 HASH_ENTER, &found);
4325 
4326 				if (found)
4327 				{
4328 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4329 							(errmsg("corrupted statistics file \"%s\"",
4330 									statfile)));
4331 					goto done;
4332 				}
4333 
4334 				memcpy(funcentry, &funcbuf, sizeof(funcbuf));
4335 				break;
4336 
4337 				/*
4338 				 * 'E'	The EOF marker of a complete stats file.
4339 				 */
4340 			case 'E':
4341 				goto done;
4342 
4343 			default:
4344 				ereport(pgStatRunningInCollector ? LOG : WARNING,
4345 						(errmsg("corrupted statistics file \"%s\"",
4346 								statfile)));
4347 				goto done;
4348 		}
4349 	}
4350 
4351 done:
4352 	FreeFile(fpin);
4353 
4354 	if (permanent)
4355 	{
4356 		elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4357 		unlink(statfile);
4358 	}
4359 }
4360 
4361 /* ----------
4362  * pgstat_read_db_statsfile_timestamp() -
4363  *
4364  *	Attempt to determine the timestamp of the last db statfile write.
4365  *	Returns true if successful; the timestamp is stored in *ts. The caller must
4366  *	rely on timestamp stored in *ts iff the function returns true.
4367  *
4368  *	This needs to be careful about handling databases for which no stats file
4369  *	exists, such as databases without a stat entry or those not yet written:
4370  *
4371  *	- if there's a database entry in the global file, return the corresponding
4372  *	stats_timestamp value.
4373  *
4374  *	- if there's no db stat entry (e.g. for a new or inactive database),
4375  *	there's no stats_timestamp value, but also nothing to write so we return
4376  *	the timestamp of the global statfile.
4377  * ----------
4378  */
4379 static bool
pgstat_read_db_statsfile_timestamp(Oid databaseid,bool permanent,TimestampTz * ts)4380 pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
4381 								   TimestampTz *ts)
4382 {
4383 	PgStat_StatDBEntry dbentry;
4384 	PgStat_GlobalStats myGlobalStats;
4385 	PgStat_ArchiverStats myArchiverStats;
4386 	PgStat_WalStats myWalStats;
4387 	PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
4388 	PgStat_StatReplSlotEntry myReplSlotStats;
4389 	FILE	   *fpin;
4390 	int32		format_id;
4391 	const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4392 
4393 	/*
4394 	 * Try to open the stats file.  As above, anything but ENOENT is worthy of
4395 	 * complaining about.
4396 	 */
4397 	if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4398 	{
4399 		if (errno != ENOENT)
4400 			ereport(pgStatRunningInCollector ? LOG : WARNING,
4401 					(errcode_for_file_access(),
4402 					 errmsg("could not open statistics file \"%s\": %m",
4403 							statfile)));
4404 		return false;
4405 	}
4406 
4407 	/*
4408 	 * Verify it's of the expected format.
4409 	 */
4410 	if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4411 		format_id != PGSTAT_FILE_FORMAT_ID)
4412 	{
4413 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4414 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4415 		FreeFile(fpin);
4416 		return false;
4417 	}
4418 
4419 	/*
4420 	 * Read global stats struct
4421 	 */
4422 	if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
4423 			  fpin) != sizeof(myGlobalStats))
4424 	{
4425 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4426 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4427 		FreeFile(fpin);
4428 		return false;
4429 	}
4430 
4431 	/*
4432 	 * Read archiver stats struct
4433 	 */
4434 	if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
4435 			  fpin) != sizeof(myArchiverStats))
4436 	{
4437 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4438 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4439 		FreeFile(fpin);
4440 		return false;
4441 	}
4442 
4443 	/*
4444 	 * Read WAL stats struct
4445 	 */
4446 	if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats))
4447 	{
4448 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4449 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4450 		FreeFile(fpin);
4451 		return false;
4452 	}
4453 
4454 	/*
4455 	 * Read SLRU stats struct
4456 	 */
4457 	if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats))
4458 	{
4459 		ereport(pgStatRunningInCollector ? LOG : WARNING,
4460 				(errmsg("corrupted statistics file \"%s\"", statfile)));
4461 		FreeFile(fpin);
4462 		return false;
4463 	}
4464 
4465 	/* By default, we're going to return the timestamp of the global file. */
4466 	*ts = myGlobalStats.stats_timestamp;
4467 
4468 	/*
4469 	 * We found an existing collector stats file.  Read it and look for a
4470 	 * record for the requested database.  If found, use its timestamp.
4471 	 */
4472 	for (;;)
4473 	{
4474 		switch (fgetc(fpin))
4475 		{
4476 				/*
4477 				 * 'D'	A PgStat_StatDBEntry struct describing a database
4478 				 * follows.
4479 				 */
4480 			case 'D':
4481 				if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
4482 						  fpin) != offsetof(PgStat_StatDBEntry, tables))
4483 				{
4484 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4485 							(errmsg("corrupted statistics file \"%s\"",
4486 									statfile)));
4487 					FreeFile(fpin);
4488 					return false;
4489 				}
4490 
4491 				/*
4492 				 * If this is the DB we're looking for, save its timestamp and
4493 				 * we're done.
4494 				 */
4495 				if (dbentry.databaseid == databaseid)
4496 				{
4497 					*ts = dbentry.stats_timestamp;
4498 					goto done;
4499 				}
4500 
4501 				break;
4502 
4503 				/*
4504 				 * 'R'	A PgStat_StatReplSlotEntry struct describing a
4505 				 * replication slot follows.
4506 				 */
4507 			case 'R':
4508 				if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4509 					!= sizeof(PgStat_StatReplSlotEntry))
4510 				{
4511 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4512 							(errmsg("corrupted statistics file \"%s\"",
4513 									statfile)));
4514 					FreeFile(fpin);
4515 					return false;
4516 				}
4517 				break;
4518 
4519 			case 'E':
4520 				goto done;
4521 
4522 			default:
4523 				{
4524 					ereport(pgStatRunningInCollector ? LOG : WARNING,
4525 							(errmsg("corrupted statistics file \"%s\"",
4526 									statfile)));
4527 					FreeFile(fpin);
4528 					return false;
4529 				}
4530 		}
4531 	}
4532 
4533 done:
4534 	FreeFile(fpin);
4535 	return true;
4536 }
4537 
4538 /*
4539  * If not already done, read the statistics collector stats file into
4540  * some hash tables.  The results will be kept until pgstat_clear_snapshot()
4541  * is called (typically, at end of transaction).
4542  */
4543 static void
backend_read_statsfile(void)4544 backend_read_statsfile(void)
4545 {
4546 	TimestampTz min_ts = 0;
4547 	TimestampTz ref_ts = 0;
4548 	Oid			inquiry_db;
4549 	int			count;
4550 
4551 	/* already read it? */
4552 	if (pgStatDBHash)
4553 		return;
4554 	Assert(!pgStatRunningInCollector);
4555 
4556 	/*
4557 	 * In a normal backend, we check staleness of the data for our own DB, and
4558 	 * so we send MyDatabaseId in inquiry messages.  In the autovac launcher,
4559 	 * check staleness of the shared-catalog data, and send InvalidOid in
4560 	 * inquiry messages so as not to force writing unnecessary data.
4561 	 */
4562 	if (IsAutoVacuumLauncherProcess())
4563 		inquiry_db = InvalidOid;
4564 	else
4565 		inquiry_db = MyDatabaseId;
4566 
4567 	/*
4568 	 * Loop until fresh enough stats file is available or we ran out of time.
4569 	 * The stats inquiry message is sent repeatedly in case collector drops
4570 	 * it; but not every single time, as that just swamps the collector.
4571 	 */
4572 	for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
4573 	{
4574 		bool		ok;
4575 		TimestampTz file_ts = 0;
4576 		TimestampTz cur_ts;
4577 
4578 		CHECK_FOR_INTERRUPTS();
4579 
4580 		ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
4581 
4582 		cur_ts = GetCurrentTimestamp();
4583 		/* Calculate min acceptable timestamp, if we didn't already */
4584 		if (count == 0 || cur_ts < ref_ts)
4585 		{
4586 			/*
4587 			 * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
4588 			 * msec before now.  This indirectly ensures that the collector
4589 			 * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
4590 			 * an autovacuum worker, however, we want a lower delay to avoid
4591 			 * using stale data, so we use PGSTAT_RETRY_DELAY (since the
4592 			 * number of workers is low, this shouldn't be a problem).
4593 			 *
4594 			 * We don't recompute min_ts after sleeping, except in the
4595 			 * unlikely case that cur_ts went backwards.  So we might end up
4596 			 * accepting a file a bit older than PGSTAT_STAT_INTERVAL.  In
4597 			 * practice that shouldn't happen, though, as long as the sleep
4598 			 * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
4599 			 * tell the collector that our cutoff time is less than what we'd
4600 			 * actually accept.
4601 			 */
4602 			ref_ts = cur_ts;
4603 			if (IsAutoVacuumWorkerProcess())
4604 				min_ts = TimestampTzPlusMilliseconds(ref_ts,
4605 													 -PGSTAT_RETRY_DELAY);
4606 			else
4607 				min_ts = TimestampTzPlusMilliseconds(ref_ts,
4608 													 -PGSTAT_STAT_INTERVAL);
4609 		}
4610 
4611 		/*
4612 		 * If the file timestamp is actually newer than cur_ts, we must have
4613 		 * had a clock glitch (system time went backwards) or there is clock
4614 		 * skew between our processor and the stats collector's processor.
4615 		 * Accept the file, but send an inquiry message anyway to make
4616 		 * pgstat_recv_inquiry do a sanity check on the collector's time.
4617 		 */
4618 		if (ok && file_ts > cur_ts)
4619 		{
4620 			/*
4621 			 * A small amount of clock skew between processors isn't terribly
4622 			 * surprising, but a large difference is worth logging.  We
4623 			 * arbitrarily define "large" as 1000 msec.
4624 			 */
4625 			if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
4626 			{
4627 				char	   *filetime;
4628 				char	   *mytime;
4629 
4630 				/* Copy because timestamptz_to_str returns a static buffer */
4631 				filetime = pstrdup(timestamptz_to_str(file_ts));
4632 				mytime = pstrdup(timestamptz_to_str(cur_ts));
4633 				ereport(LOG,
4634 						(errmsg("statistics collector's time %s is later than backend local time %s",
4635 								filetime, mytime)));
4636 				pfree(filetime);
4637 				pfree(mytime);
4638 			}
4639 
4640 			pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4641 			break;
4642 		}
4643 
4644 		/* Normal acceptance case: file is not older than cutoff time */
4645 		if (ok && file_ts >= min_ts)
4646 			break;
4647 
4648 		/* Not there or too old, so kick the collector and wait a bit */
4649 		if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
4650 			pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4651 
4652 		pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
4653 	}
4654 
4655 	if (count >= PGSTAT_POLL_LOOP_COUNT)
4656 		ereport(LOG,
4657 				(errmsg("using stale statistics instead of current ones "
4658 						"because stats collector is not responding")));
4659 
4660 	/*
4661 	 * Autovacuum launcher wants stats about all databases, but a shallow read
4662 	 * is sufficient.  Regular backends want a deep read for just the tables
4663 	 * they can see (MyDatabaseId + shared catalogs).
4664 	 */
4665 	if (IsAutoVacuumLauncherProcess())
4666 		pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
4667 	else
4668 		pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
4669 }
4670 
4671 
4672 /* ----------
4673  * pgstat_setup_memcxt() -
4674  *
4675  *	Create pgStatLocalContext, if not already done.
4676  * ----------
4677  */
4678 static void
pgstat_setup_memcxt(void)4679 pgstat_setup_memcxt(void)
4680 {
4681 	if (!pgStatLocalContext)
4682 		pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
4683 												   "Statistics snapshot",
4684 												   ALLOCSET_SMALL_SIZES);
4685 }
4686 
4687 
4688 /* ----------
4689  * pgstat_clear_snapshot() -
4690  *
4691  *	Discard any data collected in the current transaction.  Any subsequent
4692  *	request will cause new snapshots to be read.
4693  *
4694  *	This is also invoked during transaction commit or abort to discard
4695  *	the no-longer-wanted snapshot.
4696  * ----------
4697  */
4698 void
pgstat_clear_snapshot(void)4699 pgstat_clear_snapshot(void)
4700 {
4701 	/* Release memory, if any was allocated */
4702 	if (pgStatLocalContext)
4703 		MemoryContextDelete(pgStatLocalContext);
4704 
4705 	/* Reset variables */
4706 	pgStatLocalContext = NULL;
4707 	pgStatDBHash = NULL;
4708 	replSlotStatHash = NULL;
4709 
4710 	/*
4711 	 * Historically the backend_status.c facilities lived in this file, and
4712 	 * were reset with the same function. For now keep it that way, and
4713 	 * forward the reset request.
4714 	 */
4715 	pgstat_clear_backend_activity_snapshot();
4716 }
4717 
4718 
4719 /* ----------
4720  * pgstat_recv_inquiry() -
4721  *
4722  *	Process stat inquiry requests.
4723  * ----------
4724  */
4725 static void
pgstat_recv_inquiry(PgStat_MsgInquiry * msg,int len)4726 pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
4727 {
4728 	PgStat_StatDBEntry *dbentry;
4729 
4730 	elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
4731 
4732 	/*
4733 	 * If there's already a write request for this DB, there's nothing to do.
4734 	 *
4735 	 * Note that if a request is found, we return early and skip the below
4736 	 * check for clock skew.  This is okay, since the only way for a DB
4737 	 * request to be present in the list is that we have been here since the
4738 	 * last write round.  It seems sufficient to check for clock skew once per
4739 	 * write round.
4740 	 */
4741 	if (list_member_oid(pending_write_requests, msg->databaseid))
4742 		return;
4743 
4744 	/*
4745 	 * Check to see if we last wrote this database at a time >= the requested
4746 	 * cutoff time.  If so, this is a stale request that was generated before
4747 	 * we updated the DB file, and we don't need to do so again.
4748 	 *
4749 	 * If the requestor's local clock time is older than stats_timestamp, we
4750 	 * should suspect a clock glitch, ie system time going backwards; though
4751 	 * the more likely explanation is just delayed message receipt.  It is
4752 	 * worth expending a GetCurrentTimestamp call to be sure, since a large
4753 	 * retreat in the system clock reading could otherwise cause us to neglect
4754 	 * to update the stats file for a long time.
4755 	 */
4756 	dbentry = pgstat_get_db_entry(msg->databaseid, false);
4757 	if (dbentry == NULL)
4758 	{
4759 		/*
4760 		 * We have no data for this DB.  Enter a write request anyway so that
4761 		 * the global stats will get updated.  This is needed to prevent
4762 		 * backend_read_statsfile from waiting for data that we cannot supply,
4763 		 * in the case of a new DB that nobody has yet reported any stats for.
4764 		 * See the behavior of pgstat_read_db_statsfile_timestamp.
4765 		 */
4766 	}
4767 	else if (msg->clock_time < dbentry->stats_timestamp)
4768 	{
4769 		TimestampTz cur_ts = GetCurrentTimestamp();
4770 
4771 		if (cur_ts < dbentry->stats_timestamp)
4772 		{
4773 			/*
4774 			 * Sure enough, time went backwards.  Force a new stats file write
4775 			 * to get back in sync; but first, log a complaint.
4776 			 */
4777 			char	   *writetime;
4778 			char	   *mytime;
4779 
4780 			/* Copy because timestamptz_to_str returns a static buffer */
4781 			writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
4782 			mytime = pstrdup(timestamptz_to_str(cur_ts));
4783 			ereport(LOG,
4784 					(errmsg("stats_timestamp %s is later than collector's time %s for database %u",
4785 							writetime, mytime, dbentry->databaseid)));
4786 			pfree(writetime);
4787 			pfree(mytime);
4788 		}
4789 		else
4790 		{
4791 			/*
4792 			 * Nope, it's just an old request.  Assuming msg's clock_time is
4793 			 * >= its cutoff_time, it must be stale, so we can ignore it.
4794 			 */
4795 			return;
4796 		}
4797 	}
4798 	else if (msg->cutoff_time <= dbentry->stats_timestamp)
4799 	{
4800 		/* Stale request, ignore it */
4801 		return;
4802 	}
4803 
4804 	/*
4805 	 * We need to write this DB, so create a request.
4806 	 */
4807 	pending_write_requests = lappend_oid(pending_write_requests,
4808 										 msg->databaseid);
4809 }
4810 
4811 
4812 /* ----------
4813  * pgstat_recv_tabstat() -
4814  *
4815  *	Count what the backend has done.
4816  * ----------
4817  */
4818 static void
pgstat_recv_tabstat(PgStat_MsgTabstat * msg,int len)4819 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
4820 {
4821 	PgStat_StatDBEntry *dbentry;
4822 	PgStat_StatTabEntry *tabentry;
4823 	int			i;
4824 	bool		found;
4825 
4826 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4827 
4828 	/*
4829 	 * Update database-wide stats.
4830 	 */
4831 	dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
4832 	dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
4833 	dbentry->n_block_read_time += msg->m_block_read_time;
4834 	dbentry->n_block_write_time += msg->m_block_write_time;
4835 
4836 	dbentry->total_session_time += msg->m_session_time;
4837 	dbentry->total_active_time += msg->m_active_time;
4838 	dbentry->total_idle_in_xact_time += msg->m_idle_in_xact_time;
4839 
4840 	/*
4841 	 * Process all table entries in the message.
4842 	 */
4843 	for (i = 0; i < msg->m_nentries; i++)
4844 	{
4845 		PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
4846 
4847 		tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
4848 													   (void *) &(tabmsg->t_id),
4849 													   HASH_ENTER, &found);
4850 
4851 		if (!found)
4852 		{
4853 			/*
4854 			 * If it's a new table entry, initialize counters to the values we
4855 			 * just got.
4856 			 */
4857 			tabentry->numscans = tabmsg->t_counts.t_numscans;
4858 			tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
4859 			tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
4860 			tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
4861 			tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
4862 			tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
4863 			tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
4864 			tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
4865 			tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
4866 			tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
4867 			tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted;
4868 			tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
4869 			tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
4870 
4871 			tabentry->vacuum_timestamp = 0;
4872 			tabentry->vacuum_count = 0;
4873 			tabentry->autovac_vacuum_timestamp = 0;
4874 			tabentry->autovac_vacuum_count = 0;
4875 			tabentry->analyze_timestamp = 0;
4876 			tabentry->analyze_count = 0;
4877 			tabentry->autovac_analyze_timestamp = 0;
4878 			tabentry->autovac_analyze_count = 0;
4879 		}
4880 		else
4881 		{
4882 			/*
4883 			 * Otherwise add the values to the existing entry.
4884 			 */
4885 			tabentry->numscans += tabmsg->t_counts.t_numscans;
4886 			tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
4887 			tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4888 			tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4889 			tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
4890 			tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4891 			tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
4892 			/* If table was truncated, first reset the live/dead counters */
4893 			if (tabmsg->t_counts.t_truncated)
4894 			{
4895 				tabentry->n_live_tuples = 0;
4896 				tabentry->n_dead_tuples = 0;
4897 				tabentry->inserts_since_vacuum = 0;
4898 			}
4899 			tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
4900 			tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
4901 			tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
4902 			tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted;
4903 			tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4904 			tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
4905 		}
4906 
4907 		/* Clamp n_live_tuples in case of negative delta_live_tuples */
4908 		tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
4909 		/* Likewise for n_dead_tuples */
4910 		tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
4911 
4912 		/*
4913 		 * Add per-table stats to the per-database entry, too.
4914 		 */
4915 		dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
4916 		dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4917 		dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4918 		dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
4919 		dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4920 		dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4921 		dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
4922 	}
4923 }
4924 
4925 
4926 /* ----------
4927  * pgstat_recv_tabpurge() -
4928  *
4929  *	Arrange for dead table removal.
4930  * ----------
4931  */
4932 static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge * msg,int len)4933 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
4934 {
4935 	PgStat_StatDBEntry *dbentry;
4936 	int			i;
4937 
4938 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
4939 
4940 	/*
4941 	 * No need to purge if we don't even know the database.
4942 	 */
4943 	if (!dbentry || !dbentry->tables)
4944 		return;
4945 
4946 	/*
4947 	 * Process all table entries in the message.
4948 	 */
4949 	for (i = 0; i < msg->m_nentries; i++)
4950 	{
4951 		/* Remove from hashtable if present; we don't care if it's not. */
4952 		(void) hash_search(dbentry->tables,
4953 						   (void *) &(msg->m_tableid[i]),
4954 						   HASH_REMOVE, NULL);
4955 	}
4956 }
4957 
4958 
4959 /* ----------
4960  * pgstat_recv_dropdb() -
4961  *
4962  *	Arrange for dead database removal
4963  * ----------
4964  */
4965 static void
pgstat_recv_dropdb(PgStat_MsgDropdb * msg,int len)4966 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
4967 {
4968 	Oid			dbid = msg->m_databaseid;
4969 	PgStat_StatDBEntry *dbentry;
4970 
4971 	/*
4972 	 * Lookup the database in the hashtable.
4973 	 */
4974 	dbentry = pgstat_get_db_entry(dbid, false);
4975 
4976 	/*
4977 	 * If found, remove it (along with the db statfile).
4978 	 */
4979 	if (dbentry)
4980 	{
4981 		char		statfile[MAXPGPATH];
4982 
4983 		get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
4984 
4985 		elog(DEBUG2, "removing stats file \"%s\"", statfile);
4986 		unlink(statfile);
4987 
4988 		if (dbentry->tables != NULL)
4989 			hash_destroy(dbentry->tables);
4990 		if (dbentry->functions != NULL)
4991 			hash_destroy(dbentry->functions);
4992 
4993 		if (hash_search(pgStatDBHash,
4994 						(void *) &dbid,
4995 						HASH_REMOVE, NULL) == NULL)
4996 			ereport(ERROR,
4997 					(errmsg("database hash table corrupted during cleanup --- abort")));
4998 	}
4999 }
5000 
5001 
5002 /* ----------
5003  * pgstat_recv_resetcounter() -
5004  *
5005  *	Reset the statistics for the specified database.
5006  * ----------
5007  */
5008 static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter * msg,int len)5009 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
5010 {
5011 	PgStat_StatDBEntry *dbentry;
5012 
5013 	/*
5014 	 * Lookup the database in the hashtable.  Nothing to do if not there.
5015 	 */
5016 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5017 
5018 	if (!dbentry)
5019 		return;
5020 
5021 	/*
5022 	 * We simply throw away all the database's table entries by recreating a
5023 	 * new hash table for them.
5024 	 */
5025 	if (dbentry->tables != NULL)
5026 		hash_destroy(dbentry->tables);
5027 	if (dbentry->functions != NULL)
5028 		hash_destroy(dbentry->functions);
5029 
5030 	dbentry->tables = NULL;
5031 	dbentry->functions = NULL;
5032 
5033 	/*
5034 	 * Reset database-level stats, too.  This creates empty hash tables for
5035 	 * tables and functions.
5036 	 */
5037 	reset_dbentry_counters(dbentry);
5038 }
5039 
5040 /* ----------
5041  * pgstat_recv_resetsharedcounter() -
5042  *
5043  *	Reset some shared statistics of the cluster.
5044  * ----------
5045  */
5046 static void
pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter * msg,int len)5047 pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
5048 {
5049 	if (msg->m_resettarget == RESET_BGWRITER)
5050 	{
5051 		/* Reset the global background writer statistics for the cluster. */
5052 		memset(&globalStats, 0, sizeof(globalStats));
5053 		globalStats.stat_reset_timestamp = GetCurrentTimestamp();
5054 	}
5055 	else if (msg->m_resettarget == RESET_ARCHIVER)
5056 	{
5057 		/* Reset the archiver statistics for the cluster. */
5058 		memset(&archiverStats, 0, sizeof(archiverStats));
5059 		archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
5060 	}
5061 	else if (msg->m_resettarget == RESET_WAL)
5062 	{
5063 		/* Reset the WAL statistics for the cluster. */
5064 		memset(&walStats, 0, sizeof(walStats));
5065 		walStats.stat_reset_timestamp = GetCurrentTimestamp();
5066 	}
5067 
5068 	/*
5069 	 * Presumably the sender of this message validated the target, don't
5070 	 * complain here if it's not valid
5071 	 */
5072 }
5073 
5074 /* ----------
5075  * pgstat_recv_resetsinglecounter() -
5076  *
5077  *	Reset a statistics for a single object
5078  * ----------
5079  */
5080 static void
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter * msg,int len)5081 pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
5082 {
5083 	PgStat_StatDBEntry *dbentry;
5084 
5085 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5086 
5087 	if (!dbentry)
5088 		return;
5089 
5090 	/* Set the reset timestamp for the whole database */
5091 	dbentry->stat_reset_timestamp = GetCurrentTimestamp();
5092 
5093 	/* Remove object if it exists, ignore it if not */
5094 	if (msg->m_resettype == RESET_TABLE)
5095 		(void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
5096 						   HASH_REMOVE, NULL);
5097 	else if (msg->m_resettype == RESET_FUNCTION)
5098 		(void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
5099 						   HASH_REMOVE, NULL);
5100 }
5101 
5102 /* ----------
5103  * pgstat_recv_resetslrucounter() -
5104  *
5105  *	Reset some SLRU statistics of the cluster.
5106  * ----------
5107  */
5108 static void
pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter * msg,int len)5109 pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
5110 {
5111 	int			i;
5112 	TimestampTz ts = GetCurrentTimestamp();
5113 
5114 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5115 	{
5116 		/* reset entry with the given index, or all entries (index is -1) */
5117 		if ((msg->m_index == -1) || (msg->m_index == i))
5118 		{
5119 			memset(&slruStats[i], 0, sizeof(slruStats[i]));
5120 			slruStats[i].stat_reset_timestamp = ts;
5121 		}
5122 	}
5123 }
5124 
5125 /* ----------
5126  * pgstat_recv_resetreplslotcounter() -
5127  *
5128  *	Reset some replication slot statistics of the cluster.
5129  * ----------
5130  */
5131 static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter * msg,int len)5132 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
5133 								 int len)
5134 {
5135 	PgStat_StatReplSlotEntry *slotent;
5136 	TimestampTz ts;
5137 
5138 	/* Return if we don't have replication slot statistics */
5139 	if (replSlotStatHash == NULL)
5140 		return;
5141 
5142 	ts = GetCurrentTimestamp();
5143 	if (msg->clearall)
5144 	{
5145 		HASH_SEQ_STATUS sstat;
5146 
5147 		hash_seq_init(&sstat, replSlotStatHash);
5148 		while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
5149 			pgstat_reset_replslot(slotent, ts);
5150 	}
5151 	else
5152 	{
5153 		/* Get the slot statistics to reset */
5154 		slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
5155 
5156 		/*
5157 		 * Nothing to do if the given slot entry is not found.  This could
5158 		 * happen when the slot with the given name is removed and the
5159 		 * corresponding statistics entry is also removed before receiving the
5160 		 * reset message.
5161 		 */
5162 		if (!slotent)
5163 			return;
5164 
5165 		/* Reset the stats for the requested replication slot */
5166 		pgstat_reset_replslot(slotent, ts);
5167 	}
5168 }
5169 
5170 
5171 /* ----------
5172  * pgstat_recv_autovac() -
5173  *
5174  *	Process an autovacuum signaling message.
5175  * ----------
5176  */
5177 static void
pgstat_recv_autovac(PgStat_MsgAutovacStart * msg,int len)5178 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
5179 {
5180 	PgStat_StatDBEntry *dbentry;
5181 
5182 	/*
5183 	 * Store the last autovacuum time in the database's hashtable entry.
5184 	 */
5185 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5186 
5187 	dbentry->last_autovac_time = msg->m_start_time;
5188 }
5189 
5190 /* ----------
5191  * pgstat_recv_vacuum() -
5192  *
5193  *	Process a VACUUM message.
5194  * ----------
5195  */
5196 static void
pgstat_recv_vacuum(PgStat_MsgVacuum * msg,int len)5197 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
5198 {
5199 	PgStat_StatDBEntry *dbentry;
5200 	PgStat_StatTabEntry *tabentry;
5201 
5202 	/*
5203 	 * Store the data in the table's hashtable entry.
5204 	 */
5205 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5206 
5207 	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5208 
5209 	tabentry->n_live_tuples = msg->m_live_tuples;
5210 	tabentry->n_dead_tuples = msg->m_dead_tuples;
5211 
5212 	/*
5213 	 * It is quite possible that a non-aggressive VACUUM ended up skipping
5214 	 * various pages, however, we'll zero the insert counter here regardless.
5215 	 * It's currently used only to track when we need to perform an "insert"
5216 	 * autovacuum, which are mainly intended to freeze newly inserted tuples.
5217 	 * Zeroing this may just mean we'll not try to vacuum the table again
5218 	 * until enough tuples have been inserted to trigger another insert
5219 	 * autovacuum.  An anti-wraparound autovacuum will catch any persistent
5220 	 * stragglers.
5221 	 */
5222 	tabentry->inserts_since_vacuum = 0;
5223 
5224 	if (msg->m_autovacuum)
5225 	{
5226 		tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
5227 		tabentry->autovac_vacuum_count++;
5228 	}
5229 	else
5230 	{
5231 		tabentry->vacuum_timestamp = msg->m_vacuumtime;
5232 		tabentry->vacuum_count++;
5233 	}
5234 }
5235 
5236 /* ----------
5237  * pgstat_recv_analyze() -
5238  *
5239  *	Process an ANALYZE message.
5240  * ----------
5241  */
5242 static void
pgstat_recv_analyze(PgStat_MsgAnalyze * msg,int len)5243 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
5244 {
5245 	PgStat_StatDBEntry *dbentry;
5246 	PgStat_StatTabEntry *tabentry;
5247 
5248 	/*
5249 	 * Store the data in the table's hashtable entry.
5250 	 */
5251 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5252 
5253 	tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5254 
5255 	tabentry->n_live_tuples = msg->m_live_tuples;
5256 	tabentry->n_dead_tuples = msg->m_dead_tuples;
5257 
5258 	/*
5259 	 * If commanded, reset changes_since_analyze to zero.  This forgets any
5260 	 * changes that were committed while the ANALYZE was in progress, but we
5261 	 * have no good way to estimate how many of those there were.
5262 	 */
5263 	if (msg->m_resetcounter)
5264 		tabentry->changes_since_analyze = 0;
5265 
5266 	if (msg->m_autovacuum)
5267 	{
5268 		tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
5269 		tabentry->autovac_analyze_count++;
5270 	}
5271 	else
5272 	{
5273 		tabentry->analyze_timestamp = msg->m_analyzetime;
5274 		tabentry->analyze_count++;
5275 	}
5276 }
5277 
5278 
5279 /* ----------
5280  * pgstat_recv_archiver() -
5281  *
5282  *	Process a ARCHIVER message.
5283  * ----------
5284  */
5285 static void
pgstat_recv_archiver(PgStat_MsgArchiver * msg,int len)5286 pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
5287 {
5288 	if (msg->m_failed)
5289 	{
5290 		/* Failed archival attempt */
5291 		++archiverStats.failed_count;
5292 		memcpy(archiverStats.last_failed_wal, msg->m_xlog,
5293 			   sizeof(archiverStats.last_failed_wal));
5294 		archiverStats.last_failed_timestamp = msg->m_timestamp;
5295 	}
5296 	else
5297 	{
5298 		/* Successful archival operation */
5299 		++archiverStats.archived_count;
5300 		memcpy(archiverStats.last_archived_wal, msg->m_xlog,
5301 			   sizeof(archiverStats.last_archived_wal));
5302 		archiverStats.last_archived_timestamp = msg->m_timestamp;
5303 	}
5304 }
5305 
5306 /* ----------
5307  * pgstat_recv_bgwriter() -
5308  *
5309  *	Process a BGWRITER message.
5310  * ----------
5311  */
5312 static void
pgstat_recv_bgwriter(PgStat_MsgBgWriter * msg,int len)5313 pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
5314 {
5315 	globalStats.timed_checkpoints += msg->m_timed_checkpoints;
5316 	globalStats.requested_checkpoints += msg->m_requested_checkpoints;
5317 	globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
5318 	globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
5319 	globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
5320 	globalStats.buf_written_clean += msg->m_buf_written_clean;
5321 	globalStats.maxwritten_clean += msg->m_maxwritten_clean;
5322 	globalStats.buf_written_backend += msg->m_buf_written_backend;
5323 	globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
5324 	globalStats.buf_alloc += msg->m_buf_alloc;
5325 }
5326 
5327 /* ----------
5328  * pgstat_recv_wal() -
5329  *
5330  *	Process a WAL message.
5331  * ----------
5332  */
5333 static void
pgstat_recv_wal(PgStat_MsgWal * msg,int len)5334 pgstat_recv_wal(PgStat_MsgWal *msg, int len)
5335 {
5336 	walStats.wal_records += msg->m_wal_records;
5337 	walStats.wal_fpi += msg->m_wal_fpi;
5338 	walStats.wal_bytes += msg->m_wal_bytes;
5339 	walStats.wal_buffers_full += msg->m_wal_buffers_full;
5340 	walStats.wal_write += msg->m_wal_write;
5341 	walStats.wal_sync += msg->m_wal_sync;
5342 	walStats.wal_write_time += msg->m_wal_write_time;
5343 	walStats.wal_sync_time += msg->m_wal_sync_time;
5344 }
5345 
5346 /* ----------
5347  * pgstat_recv_slru() -
5348  *
5349  *	Process a SLRU message.
5350  * ----------
5351  */
5352 static void
pgstat_recv_slru(PgStat_MsgSLRU * msg,int len)5353 pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
5354 {
5355 	slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed;
5356 	slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit;
5357 	slruStats[msg->m_index].blocks_read += msg->m_blocks_read;
5358 	slruStats[msg->m_index].blocks_written += msg->m_blocks_written;
5359 	slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists;
5360 	slruStats[msg->m_index].flush += msg->m_flush;
5361 	slruStats[msg->m_index].truncate += msg->m_truncate;
5362 }
5363 
5364 /* ----------
5365  * pgstat_recv_recoveryconflict() -
5366  *
5367  *	Process a RECOVERYCONFLICT message.
5368  * ----------
5369  */
5370 static void
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict * msg,int len)5371 pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
5372 {
5373 	PgStat_StatDBEntry *dbentry;
5374 
5375 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5376 
5377 	switch (msg->m_reason)
5378 	{
5379 		case PROCSIG_RECOVERY_CONFLICT_DATABASE:
5380 
5381 			/*
5382 			 * Since we drop the information about the database as soon as it
5383 			 * replicates, there is no point in counting these conflicts.
5384 			 */
5385 			break;
5386 		case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
5387 			dbentry->n_conflict_tablespace++;
5388 			break;
5389 		case PROCSIG_RECOVERY_CONFLICT_LOCK:
5390 			dbentry->n_conflict_lock++;
5391 			break;
5392 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
5393 			dbentry->n_conflict_snapshot++;
5394 			break;
5395 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
5396 			dbentry->n_conflict_bufferpin++;
5397 			break;
5398 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
5399 			dbentry->n_conflict_startup_deadlock++;
5400 			break;
5401 	}
5402 }
5403 
5404 /* ----------
5405  * pgstat_recv_deadlock() -
5406  *
5407  *	Process a DEADLOCK message.
5408  * ----------
5409  */
5410 static void
pgstat_recv_deadlock(PgStat_MsgDeadlock * msg,int len)5411 pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
5412 {
5413 	PgStat_StatDBEntry *dbentry;
5414 
5415 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5416 
5417 	dbentry->n_deadlocks++;
5418 }
5419 
5420 /* ----------
5421  * pgstat_recv_checksum_failure() -
5422  *
5423  *	Process a CHECKSUMFAILURE message.
5424  * ----------
5425  */
5426 static void
pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure * msg,int len)5427 pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
5428 {
5429 	PgStat_StatDBEntry *dbentry;
5430 
5431 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5432 
5433 	dbentry->n_checksum_failures += msg->m_failurecount;
5434 	dbentry->last_checksum_failure = msg->m_failure_time;
5435 }
5436 
5437 /* ----------
5438  * pgstat_recv_replslot() -
5439  *
5440  *	Process a REPLSLOT message.
5441  * ----------
5442  */
5443 static void
pgstat_recv_replslot(PgStat_MsgReplSlot * msg,int len)5444 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
5445 {
5446 	if (msg->m_drop)
5447 	{
5448 		Assert(!msg->m_create);
5449 
5450 		/* Remove the replication slot statistics with the given name */
5451 		if (replSlotStatHash != NULL)
5452 			(void) hash_search(replSlotStatHash,
5453 							   (void *) &(msg->m_slotname),
5454 							   HASH_REMOVE,
5455 							   NULL);
5456 	}
5457 	else
5458 	{
5459 		PgStat_StatReplSlotEntry *slotent;
5460 
5461 		slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
5462 		Assert(slotent);
5463 
5464 		if (msg->m_create)
5465 		{
5466 			/*
5467 			 * If the message for dropping the slot with the same name gets
5468 			 * lost, slotent has stats for the old slot. So we initialize all
5469 			 * counters at slot creation.
5470 			 */
5471 			pgstat_reset_replslot(slotent, 0);
5472 		}
5473 		else
5474 		{
5475 			/* Update the replication slot statistics */
5476 			slotent->spill_txns += msg->m_spill_txns;
5477 			slotent->spill_count += msg->m_spill_count;
5478 			slotent->spill_bytes += msg->m_spill_bytes;
5479 			slotent->stream_txns += msg->m_stream_txns;
5480 			slotent->stream_count += msg->m_stream_count;
5481 			slotent->stream_bytes += msg->m_stream_bytes;
5482 			slotent->total_txns += msg->m_total_txns;
5483 			slotent->total_bytes += msg->m_total_bytes;
5484 		}
5485 	}
5486 }
5487 
5488 /* ----------
5489  * pgstat_recv_connect() -
5490  *
5491  *	Process a CONNECT message.
5492  * ----------
5493  */
5494 static void
pgstat_recv_connect(PgStat_MsgConnect * msg,int len)5495 pgstat_recv_connect(PgStat_MsgConnect *msg, int len)
5496 {
5497 	PgStat_StatDBEntry *dbentry;
5498 
5499 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5500 	dbentry->n_sessions++;
5501 }
5502 
5503 /* ----------
5504  * pgstat_recv_disconnect() -
5505  *
5506  *	Process a DISCONNECT message.
5507  * ----------
5508  */
5509 static void
pgstat_recv_disconnect(PgStat_MsgDisconnect * msg,int len)5510 pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len)
5511 {
5512 	PgStat_StatDBEntry *dbentry;
5513 
5514 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5515 
5516 	switch (msg->m_cause)
5517 	{
5518 		case DISCONNECT_NOT_YET:
5519 		case DISCONNECT_NORMAL:
5520 			/* we don't collect these */
5521 			break;
5522 		case DISCONNECT_CLIENT_EOF:
5523 			dbentry->n_sessions_abandoned++;
5524 			break;
5525 		case DISCONNECT_FATAL:
5526 			dbentry->n_sessions_fatal++;
5527 			break;
5528 		case DISCONNECT_KILLED:
5529 			dbentry->n_sessions_killed++;
5530 			break;
5531 	}
5532 }
5533 
5534 /* ----------
5535  * pgstat_recv_tempfile() -
5536  *
5537  *	Process a TEMPFILE message.
5538  * ----------
5539  */
5540 static void
pgstat_recv_tempfile(PgStat_MsgTempFile * msg,int len)5541 pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
5542 {
5543 	PgStat_StatDBEntry *dbentry;
5544 
5545 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5546 
5547 	dbentry->n_temp_bytes += msg->m_filesize;
5548 	dbentry->n_temp_files += 1;
5549 }
5550 
5551 /* ----------
5552  * pgstat_recv_funcstat() -
5553  *
5554  *	Count what the backend has done.
5555  * ----------
5556  */
5557 static void
pgstat_recv_funcstat(PgStat_MsgFuncstat * msg,int len)5558 pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
5559 {
5560 	PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
5561 	PgStat_StatDBEntry *dbentry;
5562 	PgStat_StatFuncEntry *funcentry;
5563 	int			i;
5564 	bool		found;
5565 
5566 	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5567 
5568 	/*
5569 	 * Process all function entries in the message.
5570 	 */
5571 	for (i = 0; i < msg->m_nentries; i++, funcmsg++)
5572 	{
5573 		funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
5574 														 (void *) &(funcmsg->f_id),
5575 														 HASH_ENTER, &found);
5576 
5577 		if (!found)
5578 		{
5579 			/*
5580 			 * If it's a new function entry, initialize counters to the values
5581 			 * we just got.
5582 			 */
5583 			funcentry->f_numcalls = funcmsg->f_numcalls;
5584 			funcentry->f_total_time = funcmsg->f_total_time;
5585 			funcentry->f_self_time = funcmsg->f_self_time;
5586 		}
5587 		else
5588 		{
5589 			/*
5590 			 * Otherwise add the values to the existing entry.
5591 			 */
5592 			funcentry->f_numcalls += funcmsg->f_numcalls;
5593 			funcentry->f_total_time += funcmsg->f_total_time;
5594 			funcentry->f_self_time += funcmsg->f_self_time;
5595 		}
5596 	}
5597 }
5598 
5599 /* ----------
5600  * pgstat_recv_funcpurge() -
5601  *
5602  *	Arrange for dead function removal.
5603  * ----------
5604  */
5605 static void
pgstat_recv_funcpurge(PgStat_MsgFuncpurge * msg,int len)5606 pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
5607 {
5608 	PgStat_StatDBEntry *dbentry;
5609 	int			i;
5610 
5611 	dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5612 
5613 	/*
5614 	 * No need to purge if we don't even know the database.
5615 	 */
5616 	if (!dbentry || !dbentry->functions)
5617 		return;
5618 
5619 	/*
5620 	 * Process all function entries in the message.
5621 	 */
5622 	for (i = 0; i < msg->m_nentries; i++)
5623 	{
5624 		/* Remove from hashtable if present; we don't care if it's not. */
5625 		(void) hash_search(dbentry->functions,
5626 						   (void *) &(msg->m_functionid[i]),
5627 						   HASH_REMOVE, NULL);
5628 	}
5629 }
5630 
5631 /* ----------
5632  * pgstat_write_statsfile_needed() -
5633  *
5634  *	Do we need to write out any stats files?
5635  * ----------
5636  */
5637 static bool
pgstat_write_statsfile_needed(void)5638 pgstat_write_statsfile_needed(void)
5639 {
5640 	if (pending_write_requests != NIL)
5641 		return true;
5642 
5643 	/* Everything was written recently */
5644 	return false;
5645 }
5646 
5647 /* ----------
5648  * pgstat_db_requested() -
5649  *
5650  *	Checks whether stats for a particular DB need to be written to a file.
5651  * ----------
5652  */
5653 static bool
pgstat_db_requested(Oid databaseid)5654 pgstat_db_requested(Oid databaseid)
5655 {
5656 	/*
5657 	 * If any requests are outstanding at all, we should write the stats for
5658 	 * shared catalogs (the "database" with OID 0).  This ensures that
5659 	 * backends will see up-to-date stats for shared catalogs, even though
5660 	 * they send inquiry messages mentioning only their own DB.
5661 	 */
5662 	if (databaseid == InvalidOid && pending_write_requests != NIL)
5663 		return true;
5664 
5665 	/* Search to see if there's an open request to write this database. */
5666 	if (list_member_oid(pending_write_requests, databaseid))
5667 		return true;
5668 
5669 	return false;
5670 }
5671 
5672 /* ----------
5673  * pgstat_replslot_entry
5674  *
5675  * Return the entry of replication slot stats with the given name. Return
5676  * NULL if not found and the caller didn't request to create it.
5677  *
5678  * create tells whether to create the new slot entry if it is not found.
5679  * ----------
5680  */
5681 static PgStat_StatReplSlotEntry *
pgstat_get_replslot_entry(NameData name,bool create)5682 pgstat_get_replslot_entry(NameData name, bool create)
5683 {
5684 	PgStat_StatReplSlotEntry *slotent;
5685 	bool		found;
5686 
5687 	if (replSlotStatHash == NULL)
5688 	{
5689 		HASHCTL		hash_ctl;
5690 
5691 		/*
5692 		 * Quick return NULL if the hash table is empty and the caller didn't
5693 		 * request to create the entry.
5694 		 */
5695 		if (!create)
5696 			return NULL;
5697 
5698 		hash_ctl.keysize = sizeof(NameData);
5699 		hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
5700 		replSlotStatHash = hash_create("Replication slots hash",
5701 									   PGSTAT_REPLSLOT_HASH_SIZE,
5702 									   &hash_ctl,
5703 									   HASH_ELEM | HASH_BLOBS);
5704 	}
5705 
5706 	slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
5707 													   (void *) &name,
5708 													   create ? HASH_ENTER : HASH_FIND,
5709 													   &found);
5710 
5711 	if (!slotent)
5712 	{
5713 		/* not found */
5714 		Assert(!create && !found);
5715 		return NULL;
5716 	}
5717 
5718 	/* initialize the entry */
5719 	if (create && !found)
5720 	{
5721 		namestrcpy(&(slotent->slotname), NameStr(name));
5722 		pgstat_reset_replslot(slotent, 0);
5723 	}
5724 
5725 	return slotent;
5726 }
5727 
5728 /* ----------
5729  * pgstat_reset_replslot
5730  *
5731  * Reset the given replication slot stats.
5732  * ----------
5733  */
5734 static void
pgstat_reset_replslot(PgStat_StatReplSlotEntry * slotent,TimestampTz ts)5735 pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
5736 {
5737 	/* reset only counters. Don't clear slot name */
5738 	slotent->spill_txns = 0;
5739 	slotent->spill_count = 0;
5740 	slotent->spill_bytes = 0;
5741 	slotent->stream_txns = 0;
5742 	slotent->stream_count = 0;
5743 	slotent->stream_bytes = 0;
5744 	slotent->total_txns = 0;
5745 	slotent->total_bytes = 0;
5746 	slotent->stat_reset_timestamp = ts;
5747 }
5748 
5749 /*
5750  * pgstat_slru_index
5751  *
5752  * Determine index of entry for a SLRU with a given name. If there's no exact
5753  * match, returns index of the last "other" entry used for SLRUs defined in
5754  * external projects.
5755  */
5756 int
pgstat_slru_index(const char * name)5757 pgstat_slru_index(const char *name)
5758 {
5759 	int			i;
5760 
5761 	for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5762 	{
5763 		if (strcmp(slru_names[i], name) == 0)
5764 			return i;
5765 	}
5766 
5767 	/* return index of the last entry (which is the "other" one) */
5768 	return (SLRU_NUM_ELEMENTS - 1);
5769 }
5770 
5771 /*
5772  * pgstat_slru_name
5773  *
5774  * Returns SLRU name for an index. The index may be above SLRU_NUM_ELEMENTS,
5775  * in which case this returns NULL. This allows writing code that does not
5776  * know the number of entries in advance.
5777  */
5778 const char *
pgstat_slru_name(int slru_idx)5779 pgstat_slru_name(int slru_idx)
5780 {
5781 	if (slru_idx < 0 || slru_idx >= SLRU_NUM_ELEMENTS)
5782 		return NULL;
5783 
5784 	return slru_names[slru_idx];
5785 }
5786 
5787 /*
5788  * slru_entry
5789  *
5790  * Returns pointer to entry with counters for given SLRU (based on the name
5791  * stored in SlruCtl as lwlock tranche name).
5792  */
5793 static inline PgStat_MsgSLRU *
slru_entry(int slru_idx)5794 slru_entry(int slru_idx)
5795 {
5796 	/*
5797 	 * The postmaster should never register any SLRU statistics counts; if it
5798 	 * did, the counts would be duplicated into child processes via fork().
5799 	 */
5800 	Assert(IsUnderPostmaster || !IsPostmasterEnvironment);
5801 
5802 	Assert((slru_idx >= 0) && (slru_idx < SLRU_NUM_ELEMENTS));
5803 
5804 	return &SLRUStats[slru_idx];
5805 }
5806 
5807 /*
5808  * SLRU statistics count accumulation functions --- called from slru.c
5809  */
5810 
5811 void
pgstat_count_slru_page_zeroed(int slru_idx)5812 pgstat_count_slru_page_zeroed(int slru_idx)
5813 {
5814 	slru_entry(slru_idx)->m_blocks_zeroed += 1;
5815 }
5816 
5817 void
pgstat_count_slru_page_hit(int slru_idx)5818 pgstat_count_slru_page_hit(int slru_idx)
5819 {
5820 	slru_entry(slru_idx)->m_blocks_hit += 1;
5821 }
5822 
5823 void
pgstat_count_slru_page_exists(int slru_idx)5824 pgstat_count_slru_page_exists(int slru_idx)
5825 {
5826 	slru_entry(slru_idx)->m_blocks_exists += 1;
5827 }
5828 
5829 void
pgstat_count_slru_page_read(int slru_idx)5830 pgstat_count_slru_page_read(int slru_idx)
5831 {
5832 	slru_entry(slru_idx)->m_blocks_read += 1;
5833 }
5834 
5835 void
pgstat_count_slru_page_written(int slru_idx)5836 pgstat_count_slru_page_written(int slru_idx)
5837 {
5838 	slru_entry(slru_idx)->m_blocks_written += 1;
5839 }
5840 
5841 void
pgstat_count_slru_flush(int slru_idx)5842 pgstat_count_slru_flush(int slru_idx)
5843 {
5844 	slru_entry(slru_idx)->m_flush += 1;
5845 }
5846 
5847 void
pgstat_count_slru_truncate(int slru_idx)5848 pgstat_count_slru_truncate(int slru_idx)
5849 {
5850 	slru_entry(slru_idx)->m_truncate += 1;
5851 }
5852