1 /* ----------
2 * pgstat.c
3 *
4 * All the statistics collector stuff hacked up in one big, ugly file.
5 *
6 * TODO: - Separate collector, postmaster and backend stuff
7 * into different files.
8 *
9 * - Add some automatic call for pgstat vacuuming.
10 *
11 * - Add a pgstat config column to pg_database, so this
12 * entire thing can be enabled/disabled on a per db basis.
13 *
14 * Copyright (c) 2001-2021, PostgreSQL Global Development Group
15 *
16 * src/backend/postmaster/pgstat.c
17 * ----------
18 */
19 #include "postgres.h"
20
21 #include <unistd.h>
22 #include <fcntl.h>
23 #include <sys/param.h>
24 #include <sys/time.h>
25 #include <sys/socket.h>
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <arpa/inet.h>
29 #include <signal.h>
30 #include <time.h>
31 #ifdef HAVE_SYS_SELECT_H
32 #include <sys/select.h>
33 #endif
34
35 #include "access/heapam.h"
36 #include "access/htup_details.h"
37 #include "access/tableam.h"
38 #include "access/transam.h"
39 #include "access/twophase_rmgr.h"
40 #include "access/xact.h"
41 #include "catalog/pg_database.h"
42 #include "catalog/pg_proc.h"
43 #include "common/ip.h"
44 #include "executor/instrument.h"
45 #include "libpq/libpq.h"
46 #include "libpq/pqsignal.h"
47 #include "mb/pg_wchar.h"
48 #include "miscadmin.h"
49 #include "pgstat.h"
50 #include "postmaster/autovacuum.h"
51 #include "postmaster/fork_process.h"
52 #include "postmaster/interrupt.h"
53 #include "postmaster/postmaster.h"
54 #include "replication/slot.h"
55 #include "replication/walsender.h"
56 #include "storage/backendid.h"
57 #include "storage/dsm.h"
58 #include "storage/fd.h"
59 #include "storage/ipc.h"
60 #include "storage/latch.h"
61 #include "storage/lmgr.h"
62 #include "storage/pg_shmem.h"
63 #include "storage/proc.h"
64 #include "storage/procsignal.h"
65 #include "utils/builtins.h"
66 #include "utils/guc.h"
67 #include "utils/memutils.h"
68 #include "utils/ps_status.h"
69 #include "utils/rel.h"
70 #include "utils/snapmgr.h"
71 #include "utils/timestamp.h"
72
73 /* ----------
74 * Timer definitions.
75 * ----------
76 */
77 #define PGSTAT_STAT_INTERVAL 500 /* Minimum time between stats file
78 * updates; in milliseconds. */
79
80 #define PGSTAT_RETRY_DELAY 10 /* How long to wait between checks for a
81 * new file; in milliseconds. */
82
83 #define PGSTAT_MAX_WAIT_TIME 10000 /* Maximum time to wait for a stats
84 * file update; in milliseconds. */
85
86 #define PGSTAT_INQ_INTERVAL 640 /* How often to ping the collector for a
87 * new file; in milliseconds. */
88
89 #define PGSTAT_RESTART_INTERVAL 60 /* How often to attempt to restart a
90 * failed statistics collector; in
91 * seconds. */
92
93 #define PGSTAT_POLL_LOOP_COUNT (PGSTAT_MAX_WAIT_TIME / PGSTAT_RETRY_DELAY)
94 #define PGSTAT_INQ_LOOP_COUNT (PGSTAT_INQ_INTERVAL / PGSTAT_RETRY_DELAY)
95
96 /* Minimum receive buffer size for the collector's socket. */
97 #define PGSTAT_MIN_RCVBUF (100 * 1024)
98
99
100 /* ----------
101 * The initial size hints for the hash tables used in the collector.
102 * ----------
103 */
104 #define PGSTAT_DB_HASH_SIZE 16
105 #define PGSTAT_TAB_HASH_SIZE 512
106 #define PGSTAT_FUNCTION_HASH_SIZE 512
107 #define PGSTAT_REPLSLOT_HASH_SIZE 32
108
109
110 /* ----------
111 * GUC parameters
112 * ----------
113 */
114 bool pgstat_track_counts = false;
115 int pgstat_track_functions = TRACK_FUNC_OFF;
116
117 /* ----------
118 * Built from GUC parameter
119 * ----------
120 */
121 char *pgstat_stat_directory = NULL;
122 char *pgstat_stat_filename = NULL;
123 char *pgstat_stat_tmpname = NULL;
124
125 /*
126 * BgWriter and WAL global statistics counters.
127 * Stored directly in a stats message structure so they can be sent
128 * without needing to copy things around. We assume these init to zeroes.
129 */
130 PgStat_MsgBgWriter BgWriterStats;
131 PgStat_MsgWal WalStats;
132
133 /*
134 * WAL usage counters saved from pgWALUsage at the previous call to
135 * pgstat_send_wal(). This is used to calculate how much WAL usage
136 * happens between pgstat_send_wal() calls, by substracting
137 * the previous counters from the current ones.
138 */
139 static WalUsage prevWalUsage;
140
141 /*
142 * List of SLRU names that we keep stats for. There is no central registry of
143 * SLRUs, so we use this fixed list instead. The "other" entry is used for
144 * all SLRUs without an explicit entry (e.g. SLRUs in extensions).
145 */
146 static const char *const slru_names[] = {
147 "CommitTs",
148 "MultiXactMember",
149 "MultiXactOffset",
150 "Notify",
151 "Serial",
152 "Subtrans",
153 "Xact",
154 "other" /* has to be last */
155 };
156
157 #define SLRU_NUM_ELEMENTS lengthof(slru_names)
158
159 /*
160 * SLRU statistics counts waiting to be sent to the collector. These are
161 * stored directly in stats message format so they can be sent without needing
162 * to copy things around. We assume this variable inits to zeroes. Entries
163 * are one-to-one with slru_names[].
164 */
165 static PgStat_MsgSLRU SLRUStats[SLRU_NUM_ELEMENTS];
166
167 /* ----------
168 * Local data
169 * ----------
170 */
171 NON_EXEC_STATIC pgsocket pgStatSock = PGINVALID_SOCKET;
172
173 static struct sockaddr_storage pgStatAddr;
174
175 static time_t last_pgstat_start_time;
176
177 static bool pgStatRunningInCollector = false;
178
179 /*
180 * Structures in which backends store per-table info that's waiting to be
181 * sent to the collector.
182 *
183 * NOTE: once allocated, TabStatusArray structures are never moved or deleted
184 * for the life of the backend. Also, we zero out the t_id fields of the
185 * contained PgStat_TableStatus structs whenever they are not actively in use.
186 * This allows relcache pgstat_info pointers to be treated as long-lived data,
187 * avoiding repeated searches in pgstat_initstats() when a relation is
188 * repeatedly opened during a transaction.
189 */
190 #define TABSTAT_QUANTUM 100 /* we alloc this many at a time */
191
192 typedef struct TabStatusArray
193 {
194 struct TabStatusArray *tsa_next; /* link to next array, if any */
195 int tsa_used; /* # entries currently used */
196 PgStat_TableStatus tsa_entries[TABSTAT_QUANTUM]; /* per-table data */
197 } TabStatusArray;
198
199 static TabStatusArray *pgStatTabList = NULL;
200
201 /*
202 * pgStatTabHash entry: map from relation OID to PgStat_TableStatus pointer
203 */
204 typedef struct TabStatHashEntry
205 {
206 Oid t_id;
207 PgStat_TableStatus *tsa_entry;
208 } TabStatHashEntry;
209
210 /*
211 * Hash table for O(1) t_id -> tsa_entry lookup
212 */
213 static HTAB *pgStatTabHash = NULL;
214
215 /*
216 * Backends store per-function info that's waiting to be sent to the collector
217 * in this hash table (indexed by function OID).
218 */
219 static HTAB *pgStatFunctions = NULL;
220
221 /*
222 * Indicates if backend has some function stats that it hasn't yet
223 * sent to the collector.
224 */
225 static bool have_function_stats = false;
226
227 /*
228 * Tuple insertion/deletion counts for an open transaction can't be propagated
229 * into PgStat_TableStatus counters until we know if it is going to commit
230 * or abort. Hence, we keep these counts in per-subxact structs that live
231 * in TopTransactionContext. This data structure is designed on the assumption
232 * that subxacts won't usually modify very many tables.
233 */
234 typedef struct PgStat_SubXactStatus
235 {
236 int nest_level; /* subtransaction nest level */
237 struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */
238 PgStat_TableXactStatus *first; /* head of list for this subxact */
239 } PgStat_SubXactStatus;
240
241 static PgStat_SubXactStatus *pgStatXactStack = NULL;
242
243 static int pgStatXactCommit = 0;
244 static int pgStatXactRollback = 0;
245 PgStat_Counter pgStatBlockReadTime = 0;
246 PgStat_Counter pgStatBlockWriteTime = 0;
247 static PgStat_Counter pgLastSessionReportTime = 0;
248 PgStat_Counter pgStatActiveTime = 0;
249 PgStat_Counter pgStatTransactionIdleTime = 0;
250 SessionEndType pgStatSessionEndCause = DISCONNECT_NORMAL;
251
252 /* Record that's written to 2PC state file when pgstat state is persisted */
253 typedef struct TwoPhasePgStatRecord
254 {
255 PgStat_Counter tuples_inserted; /* tuples inserted in xact */
256 PgStat_Counter tuples_updated; /* tuples updated in xact */
257 PgStat_Counter tuples_deleted; /* tuples deleted in xact */
258 PgStat_Counter inserted_pre_trunc; /* tuples inserted prior to truncate */
259 PgStat_Counter updated_pre_trunc; /* tuples updated prior to truncate */
260 PgStat_Counter deleted_pre_trunc; /* tuples deleted prior to truncate */
261 Oid t_id; /* table's OID */
262 bool t_shared; /* is it a shared catalog? */
263 bool t_truncated; /* was the relation truncated? */
264 } TwoPhasePgStatRecord;
265
266 /*
267 * Info about current "snapshot" of stats file
268 */
269 static MemoryContext pgStatLocalContext = NULL;
270 static HTAB *pgStatDBHash = NULL;
271
272 /*
273 * Cluster wide statistics, kept in the stats collector.
274 * Contains statistics that are not collected per database
275 * or per table.
276 */
277 static PgStat_ArchiverStats archiverStats;
278 static PgStat_GlobalStats globalStats;
279 static PgStat_WalStats walStats;
280 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
281 static HTAB *replSlotStatHash = NULL;
282
283 /*
284 * List of OIDs of databases we need to write out. If an entry is InvalidOid,
285 * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
286 * will write both that DB's data and the shared stats.
287 */
288 static List *pending_write_requests = NIL;
289
290 /*
291 * Total time charged to functions so far in the current backend.
292 * We use this to help separate "self" and "other" time charges.
293 * (We assume this initializes to zero.)
294 */
295 static instr_time total_func_time;
296
297
298 /* ----------
299 * Local function forward declarations
300 * ----------
301 */
302 #ifdef EXEC_BACKEND
303 static pid_t pgstat_forkexec(void);
304 #endif
305
306 NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_noreturn();
307
308 static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
309 static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
310 Oid tableoid, bool create);
311 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
312 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
313 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
314 static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
315 static void backend_read_statsfile(void);
316
317 static bool pgstat_write_statsfile_needed(void);
318 static bool pgstat_db_requested(Oid databaseid);
319
320 static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
321 static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
322
323 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now);
324 static void pgstat_send_funcstats(void);
325 static void pgstat_send_slru(void);
326 static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid);
327 static bool pgstat_should_report_connstat(void);
328 static void pgstat_report_disconnect(Oid dboid);
329
330 static PgStat_TableStatus *get_tabstat_entry(Oid rel_id, bool isshared);
331
332 static void pgstat_setup_memcxt(void);
333
334 static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
335 static void pgstat_send(void *msg, int len);
336
337 static void pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len);
338 static void pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len);
339 static void pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len);
340 static void pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len);
341 static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
342 static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
343 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
344 static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
345 static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
346 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
347 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
348 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
349 static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
350 static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
351 static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
352 static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
353 static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
354 static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
355 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
356 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
357 static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
358 static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len);
359 static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len);
360 static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
361 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
362
363 /* ------------------------------------------------------------
364 * Public functions called from postmaster follow
365 * ------------------------------------------------------------
366 */
367
368 /* ----------
369 * pgstat_init() -
370 *
371 * Called from postmaster at startup. Create the resources required
372 * by the statistics collector process. If unable to do so, do not
373 * fail --- better to let the postmaster start with stats collection
374 * disabled.
375 * ----------
376 */
377 void
pgstat_init(void)378 pgstat_init(void)
379 {
380 ACCEPT_TYPE_ARG3 alen;
381 struct addrinfo *addrs = NULL,
382 *addr,
383 hints;
384 int ret;
385 fd_set rset;
386 struct timeval tv;
387 char test_byte;
388 int sel_res;
389 int tries = 0;
390
391 #define TESTBYTEVAL ((char) 199)
392
393 /*
394 * This static assertion verifies that we didn't mess up the calculations
395 * involved in selecting maximum payload sizes for our UDP messages.
396 * Because the only consequence of overrunning PGSTAT_MAX_MSG_SIZE would
397 * be silent performance loss from fragmentation, it seems worth having a
398 * compile-time cross-check that we didn't.
399 */
400 StaticAssertStmt(sizeof(PgStat_Msg) <= PGSTAT_MAX_MSG_SIZE,
401 "maximum stats message size exceeds PGSTAT_MAX_MSG_SIZE");
402
403 /*
404 * Create the UDP socket for sending and receiving statistic messages
405 */
406 hints.ai_flags = AI_PASSIVE;
407 hints.ai_family = AF_UNSPEC;
408 hints.ai_socktype = SOCK_DGRAM;
409 hints.ai_protocol = 0;
410 hints.ai_addrlen = 0;
411 hints.ai_addr = NULL;
412 hints.ai_canonname = NULL;
413 hints.ai_next = NULL;
414 ret = pg_getaddrinfo_all("localhost", NULL, &hints, &addrs);
415 if (ret || !addrs)
416 {
417 ereport(LOG,
418 (errmsg("could not resolve \"localhost\": %s",
419 gai_strerror(ret))));
420 goto startup_failed;
421 }
422
423 /*
424 * On some platforms, pg_getaddrinfo_all() may return multiple addresses
425 * only one of which will actually work (eg, both IPv6 and IPv4 addresses
426 * when kernel will reject IPv6). Worse, the failure may occur at the
427 * bind() or perhaps even connect() stage. So we must loop through the
428 * results till we find a working combination. We will generate LOG
429 * messages, but no error, for bogus combinations.
430 */
431 for (addr = addrs; addr; addr = addr->ai_next)
432 {
433 #ifdef HAVE_UNIX_SOCKETS
434 /* Ignore AF_UNIX sockets, if any are returned. */
435 if (addr->ai_family == AF_UNIX)
436 continue;
437 #endif
438
439 if (++tries > 1)
440 ereport(LOG,
441 (errmsg("trying another address for the statistics collector")));
442
443 /*
444 * Create the socket.
445 */
446 if ((pgStatSock = socket(addr->ai_family, SOCK_DGRAM, 0)) == PGINVALID_SOCKET)
447 {
448 ereport(LOG,
449 (errcode_for_socket_access(),
450 errmsg("could not create socket for statistics collector: %m")));
451 continue;
452 }
453
454 /*
455 * Bind it to a kernel assigned port on localhost and get the assigned
456 * port via getsockname().
457 */
458 if (bind(pgStatSock, addr->ai_addr, addr->ai_addrlen) < 0)
459 {
460 ereport(LOG,
461 (errcode_for_socket_access(),
462 errmsg("could not bind socket for statistics collector: %m")));
463 closesocket(pgStatSock);
464 pgStatSock = PGINVALID_SOCKET;
465 continue;
466 }
467
468 alen = sizeof(pgStatAddr);
469 if (getsockname(pgStatSock, (struct sockaddr *) &pgStatAddr, &alen) < 0)
470 {
471 ereport(LOG,
472 (errcode_for_socket_access(),
473 errmsg("could not get address of socket for statistics collector: %m")));
474 closesocket(pgStatSock);
475 pgStatSock = PGINVALID_SOCKET;
476 continue;
477 }
478
479 /*
480 * Connect the socket to its own address. This saves a few cycles by
481 * not having to respecify the target address on every send. This also
482 * provides a kernel-level check that only packets from this same
483 * address will be received.
484 */
485 if (connect(pgStatSock, (struct sockaddr *) &pgStatAddr, alen) < 0)
486 {
487 ereport(LOG,
488 (errcode_for_socket_access(),
489 errmsg("could not connect socket for statistics collector: %m")));
490 closesocket(pgStatSock);
491 pgStatSock = PGINVALID_SOCKET;
492 continue;
493 }
494
495 /*
496 * Try to send and receive a one-byte test message on the socket. This
497 * is to catch situations where the socket can be created but will not
498 * actually pass data (for instance, because kernel packet filtering
499 * rules prevent it).
500 */
501 test_byte = TESTBYTEVAL;
502
503 retry1:
504 if (send(pgStatSock, &test_byte, 1, 0) != 1)
505 {
506 if (errno == EINTR)
507 goto retry1; /* if interrupted, just retry */
508 ereport(LOG,
509 (errcode_for_socket_access(),
510 errmsg("could not send test message on socket for statistics collector: %m")));
511 closesocket(pgStatSock);
512 pgStatSock = PGINVALID_SOCKET;
513 continue;
514 }
515
516 /*
517 * There could possibly be a little delay before the message can be
518 * received. We arbitrarily allow up to half a second before deciding
519 * it's broken.
520 */
521 for (;;) /* need a loop to handle EINTR */
522 {
523 FD_ZERO(&rset);
524 FD_SET(pgStatSock, &rset);
525
526 tv.tv_sec = 0;
527 tv.tv_usec = 500000;
528 sel_res = select(pgStatSock + 1, &rset, NULL, NULL, &tv);
529 if (sel_res >= 0 || errno != EINTR)
530 break;
531 }
532 if (sel_res < 0)
533 {
534 ereport(LOG,
535 (errcode_for_socket_access(),
536 errmsg("select() failed in statistics collector: %m")));
537 closesocket(pgStatSock);
538 pgStatSock = PGINVALID_SOCKET;
539 continue;
540 }
541 if (sel_res == 0 || !FD_ISSET(pgStatSock, &rset))
542 {
543 /*
544 * This is the case we actually think is likely, so take pains to
545 * give a specific message for it.
546 *
547 * errno will not be set meaningfully here, so don't use it.
548 */
549 ereport(LOG,
550 (errcode(ERRCODE_CONNECTION_FAILURE),
551 errmsg("test message did not get through on socket for statistics collector")));
552 closesocket(pgStatSock);
553 pgStatSock = PGINVALID_SOCKET;
554 continue;
555 }
556
557 test_byte++; /* just make sure variable is changed */
558
559 retry2:
560 if (recv(pgStatSock, &test_byte, 1, 0) != 1)
561 {
562 if (errno == EINTR)
563 goto retry2; /* if interrupted, just retry */
564 ereport(LOG,
565 (errcode_for_socket_access(),
566 errmsg("could not receive test message on socket for statistics collector: %m")));
567 closesocket(pgStatSock);
568 pgStatSock = PGINVALID_SOCKET;
569 continue;
570 }
571
572 if (test_byte != TESTBYTEVAL) /* strictly paranoia ... */
573 {
574 ereport(LOG,
575 (errcode(ERRCODE_INTERNAL_ERROR),
576 errmsg("incorrect test message transmission on socket for statistics collector")));
577 closesocket(pgStatSock);
578 pgStatSock = PGINVALID_SOCKET;
579 continue;
580 }
581
582 /* If we get here, we have a working socket */
583 break;
584 }
585
586 /* Did we find a working address? */
587 if (!addr || pgStatSock == PGINVALID_SOCKET)
588 goto startup_failed;
589
590 /*
591 * Set the socket to non-blocking IO. This ensures that if the collector
592 * falls behind, statistics messages will be discarded; backends won't
593 * block waiting to send messages to the collector.
594 */
595 if (!pg_set_noblock(pgStatSock))
596 {
597 ereport(LOG,
598 (errcode_for_socket_access(),
599 errmsg("could not set statistics collector socket to nonblocking mode: %m")));
600 goto startup_failed;
601 }
602
603 /*
604 * Try to ensure that the socket's receive buffer is at least
605 * PGSTAT_MIN_RCVBUF bytes, so that it won't easily overflow and lose
606 * data. Use of UDP protocol means that we are willing to lose data under
607 * heavy load, but we don't want it to happen just because of ridiculously
608 * small default buffer sizes (such as 8KB on older Windows versions).
609 */
610 {
611 int old_rcvbuf;
612 int new_rcvbuf;
613 ACCEPT_TYPE_ARG3 rcvbufsize = sizeof(old_rcvbuf);
614
615 if (getsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
616 (char *) &old_rcvbuf, &rcvbufsize) < 0)
617 {
618 ereport(LOG,
619 (errmsg("%s(%s) failed: %m", "getsockopt", "SO_RCVBUF")));
620 /* if we can't get existing size, always try to set it */
621 old_rcvbuf = 0;
622 }
623
624 new_rcvbuf = PGSTAT_MIN_RCVBUF;
625 if (old_rcvbuf < new_rcvbuf)
626 {
627 if (setsockopt(pgStatSock, SOL_SOCKET, SO_RCVBUF,
628 (char *) &new_rcvbuf, sizeof(new_rcvbuf)) < 0)
629 ereport(LOG,
630 (errmsg("%s(%s) failed: %m", "setsockopt", "SO_RCVBUF")));
631 }
632 }
633
634 pg_freeaddrinfo_all(hints.ai_family, addrs);
635
636 /* Now that we have a long-lived socket, tell fd.c about it. */
637 ReserveExternalFD();
638
639 return;
640
641 startup_failed:
642 ereport(LOG,
643 (errmsg("disabling statistics collector for lack of working socket")));
644
645 if (addrs)
646 pg_freeaddrinfo_all(hints.ai_family, addrs);
647
648 if (pgStatSock != PGINVALID_SOCKET)
649 closesocket(pgStatSock);
650 pgStatSock = PGINVALID_SOCKET;
651
652 /*
653 * Adjust GUC variables to suppress useless activity, and for debugging
654 * purposes (seeing track_counts off is a clue that we failed here). We
655 * use PGC_S_OVERRIDE because there is no point in trying to turn it back
656 * on from postgresql.conf without a restart.
657 */
658 SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
659 }
660
661 /*
662 * subroutine for pgstat_reset_all
663 */
664 static void
pgstat_reset_remove_files(const char * directory)665 pgstat_reset_remove_files(const char *directory)
666 {
667 DIR *dir;
668 struct dirent *entry;
669 char fname[MAXPGPATH * 2];
670
671 dir = AllocateDir(directory);
672 while ((entry = ReadDir(dir, directory)) != NULL)
673 {
674 int nchars;
675 Oid tmp_oid;
676
677 /*
678 * Skip directory entries that don't match the file names we write.
679 * See get_dbstat_filename for the database-specific pattern.
680 */
681 if (strncmp(entry->d_name, "global.", 7) == 0)
682 nchars = 7;
683 else
684 {
685 nchars = 0;
686 (void) sscanf(entry->d_name, "db_%u.%n",
687 &tmp_oid, &nchars);
688 if (nchars <= 0)
689 continue;
690 /* %u allows leading whitespace, so reject that */
691 if (strchr("0123456789", entry->d_name[3]) == NULL)
692 continue;
693 }
694
695 if (strcmp(entry->d_name + nchars, "tmp") != 0 &&
696 strcmp(entry->d_name + nchars, "stat") != 0)
697 continue;
698
699 snprintf(fname, sizeof(fname), "%s/%s", directory,
700 entry->d_name);
701 unlink(fname);
702 }
703 FreeDir(dir);
704 }
705
706 /*
707 * pgstat_reset_all() -
708 *
709 * Remove the stats files. This is currently used only if WAL
710 * recovery is needed after a crash.
711 */
712 void
pgstat_reset_all(void)713 pgstat_reset_all(void)
714 {
715 pgstat_reset_remove_files(pgstat_stat_directory);
716 pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
717 }
718
719 #ifdef EXEC_BACKEND
720
721 /*
722 * pgstat_forkexec() -
723 *
724 * Format up the arglist for, then fork and exec, statistics collector process
725 */
726 static pid_t
pgstat_forkexec(void)727 pgstat_forkexec(void)
728 {
729 char *av[10];
730 int ac = 0;
731
732 av[ac++] = "postgres";
733 av[ac++] = "--forkcol";
734 av[ac++] = NULL; /* filled in by postmaster_forkexec */
735
736 av[ac] = NULL;
737 Assert(ac < lengthof(av));
738
739 return postmaster_forkexec(ac, av);
740 }
741 #endif /* EXEC_BACKEND */
742
743
744 /*
745 * pgstat_start() -
746 *
747 * Called from postmaster at startup or after an existing collector
748 * died. Attempt to fire up a fresh statistics collector.
749 *
750 * Returns PID of child process, or 0 if fail.
751 *
752 * Note: if fail, we will be called again from the postmaster main loop.
753 */
754 int
pgstat_start(void)755 pgstat_start(void)
756 {
757 time_t curtime;
758 pid_t pgStatPid;
759
760 /*
761 * Check that the socket is there, else pgstat_init failed and we can do
762 * nothing useful.
763 */
764 if (pgStatSock == PGINVALID_SOCKET)
765 return 0;
766
767 /*
768 * Do nothing if too soon since last collector start. This is a safety
769 * valve to protect against continuous respawn attempts if the collector
770 * is dying immediately at launch. Note that since we will be re-called
771 * from the postmaster main loop, we will get another chance later.
772 */
773 curtime = time(NULL);
774 if ((unsigned int) (curtime - last_pgstat_start_time) <
775 (unsigned int) PGSTAT_RESTART_INTERVAL)
776 return 0;
777 last_pgstat_start_time = curtime;
778
779 /*
780 * Okay, fork off the collector.
781 */
782 #ifdef EXEC_BACKEND
783 switch ((pgStatPid = pgstat_forkexec()))
784 #else
785 switch ((pgStatPid = fork_process()))
786 #endif
787 {
788 case -1:
789 ereport(LOG,
790 (errmsg("could not fork statistics collector: %m")));
791 return 0;
792
793 #ifndef EXEC_BACKEND
794 case 0:
795 /* in postmaster child ... */
796 InitPostmasterChild();
797
798 /* Close the postmaster's sockets */
799 ClosePostmasterPorts(false);
800
801 /* Drop our connection to postmaster's shared memory, as well */
802 dsm_detach_all();
803 PGSharedMemoryDetach();
804
805 PgstatCollectorMain(0, NULL);
806 break;
807 #endif
808
809 default:
810 return (int) pgStatPid;
811 }
812
813 /* shouldn't get here */
814 return 0;
815 }
816
817 void
allow_immediate_pgstat_restart(void)818 allow_immediate_pgstat_restart(void)
819 {
820 last_pgstat_start_time = 0;
821 }
822
823 /* ------------------------------------------------------------
824 * Public functions used by backends follow
825 *------------------------------------------------------------
826 */
827
828
829 /* ----------
830 * pgstat_report_stat() -
831 *
832 * Must be called by processes that performs DML: tcop/postgres.c, logical
833 * receiver processes, SPI worker, etc. to send the so far collected
834 * per-table and function usage statistics to the collector. Note that this
835 * is called only when not within a transaction, so it is fair to use
836 * transaction stop time as an approximation of current time.
837 *
838 * "disconnect" is "true" only for the last call before the backend
839 * exits. This makes sure that no data is lost and that interrupted
840 * sessions are reported correctly.
841 * ----------
842 */
843 void
pgstat_report_stat(bool disconnect)844 pgstat_report_stat(bool disconnect)
845 {
846 /* we assume this inits to all zeroes: */
847 static const PgStat_TableCounts all_zeroes;
848 static TimestampTz last_report = 0;
849
850 TimestampTz now;
851 PgStat_MsgTabstat regular_msg;
852 PgStat_MsgTabstat shared_msg;
853 TabStatusArray *tsa;
854 int i;
855
856 /*
857 * Don't expend a clock check if nothing to do.
858 *
859 * To determine whether any WAL activity has occurred since last time, not
860 * only the number of generated WAL records but also the numbers of WAL
861 * writes and syncs need to be checked. Because even transaction that
862 * generates no WAL records can write or sync WAL data when flushing the
863 * data pages.
864 */
865 if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) &&
866 pgStatXactCommit == 0 && pgStatXactRollback == 0 &&
867 pgWalUsage.wal_records == prevWalUsage.wal_records &&
868 WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0 &&
869 !have_function_stats && !disconnect)
870 return;
871
872 /*
873 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
874 * msec since we last sent one, or the backend is about to exit.
875 */
876 now = GetCurrentTransactionStopTimestamp();
877 if (!disconnect &&
878 !TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL))
879 return;
880
881 last_report = now;
882
883 if (disconnect)
884 pgstat_report_disconnect(MyDatabaseId);
885
886 /*
887 * Destroy pgStatTabHash before we start invalidating PgStat_TableEntry
888 * entries it points to. (Should we fail partway through the loop below,
889 * it's okay to have removed the hashtable already --- the only
890 * consequence is we'd get multiple entries for the same table in the
891 * pgStatTabList, and that's safe.)
892 */
893 if (pgStatTabHash)
894 hash_destroy(pgStatTabHash);
895 pgStatTabHash = NULL;
896
897 /*
898 * Scan through the TabStatusArray struct(s) to find tables that actually
899 * have counts, and build messages to send. We have to separate shared
900 * relations from regular ones because the databaseid field in the message
901 * header has to depend on that.
902 */
903 regular_msg.m_databaseid = MyDatabaseId;
904 shared_msg.m_databaseid = InvalidOid;
905 regular_msg.m_nentries = 0;
906 shared_msg.m_nentries = 0;
907
908 for (tsa = pgStatTabList; tsa != NULL; tsa = tsa->tsa_next)
909 {
910 for (i = 0; i < tsa->tsa_used; i++)
911 {
912 PgStat_TableStatus *entry = &tsa->tsa_entries[i];
913 PgStat_MsgTabstat *this_msg;
914 PgStat_TableEntry *this_ent;
915
916 /* Shouldn't have any pending transaction-dependent counts */
917 Assert(entry->trans == NULL);
918
919 /*
920 * Ignore entries that didn't accumulate any actual counts, such
921 * as indexes that were opened by the planner but not used.
922 */
923 if (memcmp(&entry->t_counts, &all_zeroes,
924 sizeof(PgStat_TableCounts)) == 0)
925 continue;
926
927 /*
928 * OK, insert data into the appropriate message, and send if full.
929 */
930 this_msg = entry->t_shared ? &shared_msg : ®ular_msg;
931 this_ent = &this_msg->m_entry[this_msg->m_nentries];
932 this_ent->t_id = entry->t_id;
933 memcpy(&this_ent->t_counts, &entry->t_counts,
934 sizeof(PgStat_TableCounts));
935 if (++this_msg->m_nentries >= PGSTAT_NUM_TABENTRIES)
936 {
937 pgstat_send_tabstat(this_msg, now);
938 this_msg->m_nentries = 0;
939 }
940 }
941 /* zero out PgStat_TableStatus structs after use */
942 MemSet(tsa->tsa_entries, 0,
943 tsa->tsa_used * sizeof(PgStat_TableStatus));
944 tsa->tsa_used = 0;
945 }
946
947 /*
948 * Send partial messages. Make sure that any pending xact commit/abort
949 * and connection stats get counted, even if there are no table stats to
950 * send.
951 */
952 if (regular_msg.m_nentries > 0 ||
953 pgStatXactCommit > 0 || pgStatXactRollback > 0 || disconnect)
954 pgstat_send_tabstat(®ular_msg, now);
955 if (shared_msg.m_nentries > 0)
956 pgstat_send_tabstat(&shared_msg, now);
957
958 /* Now, send function statistics */
959 pgstat_send_funcstats();
960
961 /* Send WAL statistics */
962 pgstat_send_wal(true);
963
964 /* Finally send SLRU statistics */
965 pgstat_send_slru();
966 }
967
968 /*
969 * Subroutine for pgstat_report_stat: finish and send a tabstat message
970 */
971 static void
pgstat_send_tabstat(PgStat_MsgTabstat * tsmsg,TimestampTz now)972 pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now)
973 {
974 int n;
975 int len;
976
977 /* It's unlikely we'd get here with no socket, but maybe not impossible */
978 if (pgStatSock == PGINVALID_SOCKET)
979 return;
980
981 /*
982 * Report and reset accumulated xact commit/rollback and I/O timings
983 * whenever we send a normal tabstat message
984 */
985 if (OidIsValid(tsmsg->m_databaseid))
986 {
987 tsmsg->m_xact_commit = pgStatXactCommit;
988 tsmsg->m_xact_rollback = pgStatXactRollback;
989 tsmsg->m_block_read_time = pgStatBlockReadTime;
990 tsmsg->m_block_write_time = pgStatBlockWriteTime;
991
992 if (pgstat_should_report_connstat())
993 {
994 long secs;
995 int usecs;
996
997 /*
998 * pgLastSessionReportTime is initialized to MyStartTimestamp by
999 * pgstat_report_connect().
1000 */
1001 TimestampDifference(pgLastSessionReportTime, now, &secs, &usecs);
1002 pgLastSessionReportTime = now;
1003 tsmsg->m_session_time = (PgStat_Counter) secs * 1000000 + usecs;
1004 tsmsg->m_active_time = pgStatActiveTime;
1005 tsmsg->m_idle_in_xact_time = pgStatTransactionIdleTime;
1006 }
1007 else
1008 {
1009 tsmsg->m_session_time = 0;
1010 tsmsg->m_active_time = 0;
1011 tsmsg->m_idle_in_xact_time = 0;
1012 }
1013 pgStatXactCommit = 0;
1014 pgStatXactRollback = 0;
1015 pgStatBlockReadTime = 0;
1016 pgStatBlockWriteTime = 0;
1017 pgStatActiveTime = 0;
1018 pgStatTransactionIdleTime = 0;
1019 }
1020 else
1021 {
1022 tsmsg->m_xact_commit = 0;
1023 tsmsg->m_xact_rollback = 0;
1024 tsmsg->m_block_read_time = 0;
1025 tsmsg->m_block_write_time = 0;
1026 tsmsg->m_session_time = 0;
1027 tsmsg->m_active_time = 0;
1028 tsmsg->m_idle_in_xact_time = 0;
1029 }
1030
1031 n = tsmsg->m_nentries;
1032 len = offsetof(PgStat_MsgTabstat, m_entry[0]) +
1033 n * sizeof(PgStat_TableEntry);
1034
1035 pgstat_setheader(&tsmsg->m_hdr, PGSTAT_MTYPE_TABSTAT);
1036 pgstat_send(tsmsg, len);
1037 }
1038
1039 /*
1040 * Subroutine for pgstat_report_stat: populate and send a function stat message
1041 */
1042 static void
pgstat_send_funcstats(void)1043 pgstat_send_funcstats(void)
1044 {
1045 /* we assume this inits to all zeroes: */
1046 static const PgStat_FunctionCounts all_zeroes;
1047
1048 PgStat_MsgFuncstat msg;
1049 PgStat_BackendFunctionEntry *entry;
1050 HASH_SEQ_STATUS fstat;
1051
1052 if (pgStatFunctions == NULL)
1053 return;
1054
1055 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_FUNCSTAT);
1056 msg.m_databaseid = MyDatabaseId;
1057 msg.m_nentries = 0;
1058
1059 hash_seq_init(&fstat, pgStatFunctions);
1060 while ((entry = (PgStat_BackendFunctionEntry *) hash_seq_search(&fstat)) != NULL)
1061 {
1062 PgStat_FunctionEntry *m_ent;
1063
1064 /* Skip it if no counts accumulated since last time */
1065 if (memcmp(&entry->f_counts, &all_zeroes,
1066 sizeof(PgStat_FunctionCounts)) == 0)
1067 continue;
1068
1069 /* need to convert format of time accumulators */
1070 m_ent = &msg.m_entry[msg.m_nentries];
1071 m_ent->f_id = entry->f_id;
1072 m_ent->f_numcalls = entry->f_counts.f_numcalls;
1073 m_ent->f_total_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_total_time);
1074 m_ent->f_self_time = INSTR_TIME_GET_MICROSEC(entry->f_counts.f_self_time);
1075
1076 if (++msg.m_nentries >= PGSTAT_NUM_FUNCENTRIES)
1077 {
1078 pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1079 msg.m_nentries * sizeof(PgStat_FunctionEntry));
1080 msg.m_nentries = 0;
1081 }
1082
1083 /* reset the entry's counts */
1084 MemSet(&entry->f_counts, 0, sizeof(PgStat_FunctionCounts));
1085 }
1086
1087 if (msg.m_nentries > 0)
1088 pgstat_send(&msg, offsetof(PgStat_MsgFuncstat, m_entry[0]) +
1089 msg.m_nentries * sizeof(PgStat_FunctionEntry));
1090
1091 have_function_stats = false;
1092 }
1093
1094
1095 /* ----------
1096 * pgstat_vacuum_stat() -
1097 *
1098 * Will tell the collector about objects he can get rid of.
1099 * ----------
1100 */
1101 void
pgstat_vacuum_stat(void)1102 pgstat_vacuum_stat(void)
1103 {
1104 HTAB *htab;
1105 PgStat_MsgTabpurge msg;
1106 PgStat_MsgFuncpurge f_msg;
1107 HASH_SEQ_STATUS hstat;
1108 PgStat_StatDBEntry *dbentry;
1109 PgStat_StatTabEntry *tabentry;
1110 PgStat_StatFuncEntry *funcentry;
1111 int len;
1112
1113 if (pgStatSock == PGINVALID_SOCKET)
1114 return;
1115
1116 /*
1117 * If not done for this transaction, read the statistics collector stats
1118 * file into some hash tables.
1119 */
1120 backend_read_statsfile();
1121
1122 /*
1123 * Read pg_database and make a list of OIDs of all existing databases
1124 */
1125 htab = pgstat_collect_oids(DatabaseRelationId, Anum_pg_database_oid);
1126
1127 /*
1128 * Search the database hash table for dead databases and tell the
1129 * collector to drop them.
1130 */
1131 hash_seq_init(&hstat, pgStatDBHash);
1132 while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
1133 {
1134 Oid dbid = dbentry->databaseid;
1135
1136 CHECK_FOR_INTERRUPTS();
1137
1138 /* the DB entry for shared tables (with InvalidOid) is never dropped */
1139 if (OidIsValid(dbid) &&
1140 hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
1141 pgstat_drop_database(dbid);
1142 }
1143
1144 /* Clean up */
1145 hash_destroy(htab);
1146
1147 /*
1148 * Search for all the dead replication slots in stats hashtable and tell
1149 * the stats collector to drop them.
1150 */
1151 if (replSlotStatHash)
1152 {
1153 PgStat_StatReplSlotEntry *slotentry;
1154
1155 hash_seq_init(&hstat, replSlotStatHash);
1156 while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
1157 {
1158 CHECK_FOR_INTERRUPTS();
1159
1160 if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
1161 pgstat_report_replslot_drop(NameStr(slotentry->slotname));
1162 }
1163 }
1164
1165 /*
1166 * Lookup our own database entry; if not found, nothing more to do.
1167 */
1168 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
1169 (void *) &MyDatabaseId,
1170 HASH_FIND, NULL);
1171 if (dbentry == NULL || dbentry->tables == NULL)
1172 return;
1173
1174 /*
1175 * Similarly to above, make a list of all known relations in this DB.
1176 */
1177 htab = pgstat_collect_oids(RelationRelationId, Anum_pg_class_oid);
1178
1179 /*
1180 * Initialize our messages table counter to zero
1181 */
1182 msg.m_nentries = 0;
1183
1184 /*
1185 * Check for all tables listed in stats hashtable if they still exist.
1186 */
1187 hash_seq_init(&hstat, dbentry->tables);
1188 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
1189 {
1190 Oid tabid = tabentry->tableid;
1191
1192 CHECK_FOR_INTERRUPTS();
1193
1194 if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
1195 continue;
1196
1197 /*
1198 * Not there, so add this table's Oid to the message
1199 */
1200 msg.m_tableid[msg.m_nentries++] = tabid;
1201
1202 /*
1203 * If the message is full, send it out and reinitialize to empty
1204 */
1205 if (msg.m_nentries >= PGSTAT_NUM_TABPURGE)
1206 {
1207 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1208 + msg.m_nentries * sizeof(Oid);
1209
1210 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1211 msg.m_databaseid = MyDatabaseId;
1212 pgstat_send(&msg, len);
1213
1214 msg.m_nentries = 0;
1215 }
1216 }
1217
1218 /*
1219 * Send the rest
1220 */
1221 if (msg.m_nentries > 0)
1222 {
1223 len = offsetof(PgStat_MsgTabpurge, m_tableid[0])
1224 + msg.m_nentries * sizeof(Oid);
1225
1226 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1227 msg.m_databaseid = MyDatabaseId;
1228 pgstat_send(&msg, len);
1229 }
1230
1231 /* Clean up */
1232 hash_destroy(htab);
1233
1234 /*
1235 * Now repeat the above steps for functions. However, we needn't bother
1236 * in the common case where no function stats are being collected.
1237 */
1238 if (dbentry->functions != NULL &&
1239 hash_get_num_entries(dbentry->functions) > 0)
1240 {
1241 htab = pgstat_collect_oids(ProcedureRelationId, Anum_pg_proc_oid);
1242
1243 pgstat_setheader(&f_msg.m_hdr, PGSTAT_MTYPE_FUNCPURGE);
1244 f_msg.m_databaseid = MyDatabaseId;
1245 f_msg.m_nentries = 0;
1246
1247 hash_seq_init(&hstat, dbentry->functions);
1248 while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&hstat)) != NULL)
1249 {
1250 Oid funcid = funcentry->functionid;
1251
1252 CHECK_FOR_INTERRUPTS();
1253
1254 if (hash_search(htab, (void *) &funcid, HASH_FIND, NULL) != NULL)
1255 continue;
1256
1257 /*
1258 * Not there, so add this function's Oid to the message
1259 */
1260 f_msg.m_functionid[f_msg.m_nentries++] = funcid;
1261
1262 /*
1263 * If the message is full, send it out and reinitialize to empty
1264 */
1265 if (f_msg.m_nentries >= PGSTAT_NUM_FUNCPURGE)
1266 {
1267 len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1268 + f_msg.m_nentries * sizeof(Oid);
1269
1270 pgstat_send(&f_msg, len);
1271
1272 f_msg.m_nentries = 0;
1273 }
1274 }
1275
1276 /*
1277 * Send the rest
1278 */
1279 if (f_msg.m_nentries > 0)
1280 {
1281 len = offsetof(PgStat_MsgFuncpurge, m_functionid[0])
1282 + f_msg.m_nentries * sizeof(Oid);
1283
1284 pgstat_send(&f_msg, len);
1285 }
1286
1287 hash_destroy(htab);
1288 }
1289 }
1290
1291
1292 /* ----------
1293 * pgstat_collect_oids() -
1294 *
1295 * Collect the OIDs of all objects listed in the specified system catalog
1296 * into a temporary hash table. Caller should hash_destroy the result
1297 * when done with it. (However, we make the table in CurrentMemoryContext
1298 * so that it will be freed properly in event of an error.)
1299 * ----------
1300 */
1301 static HTAB *
pgstat_collect_oids(Oid catalogid,AttrNumber anum_oid)1302 pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid)
1303 {
1304 HTAB *htab;
1305 HASHCTL hash_ctl;
1306 Relation rel;
1307 TableScanDesc scan;
1308 HeapTuple tup;
1309 Snapshot snapshot;
1310
1311 hash_ctl.keysize = sizeof(Oid);
1312 hash_ctl.entrysize = sizeof(Oid);
1313 hash_ctl.hcxt = CurrentMemoryContext;
1314 htab = hash_create("Temporary table of OIDs",
1315 PGSTAT_TAB_HASH_SIZE,
1316 &hash_ctl,
1317 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1318
1319 rel = table_open(catalogid, AccessShareLock);
1320 snapshot = RegisterSnapshot(GetLatestSnapshot());
1321 scan = table_beginscan(rel, snapshot, 0, NULL);
1322 while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
1323 {
1324 Oid thisoid;
1325 bool isnull;
1326
1327 thisoid = heap_getattr(tup, anum_oid, RelationGetDescr(rel), &isnull);
1328 Assert(!isnull);
1329
1330 CHECK_FOR_INTERRUPTS();
1331
1332 (void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
1333 }
1334 table_endscan(scan);
1335 UnregisterSnapshot(snapshot);
1336 table_close(rel, AccessShareLock);
1337
1338 return htab;
1339 }
1340
1341
1342 /* ----------
1343 * pgstat_drop_database() -
1344 *
1345 * Tell the collector that we just dropped a database.
1346 * (If the message gets lost, we will still clean the dead DB eventually
1347 * via future invocations of pgstat_vacuum_stat().)
1348 * ----------
1349 */
1350 void
pgstat_drop_database(Oid databaseid)1351 pgstat_drop_database(Oid databaseid)
1352 {
1353 PgStat_MsgDropdb msg;
1354
1355 if (pgStatSock == PGINVALID_SOCKET)
1356 return;
1357
1358 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DROPDB);
1359 msg.m_databaseid = databaseid;
1360 pgstat_send(&msg, sizeof(msg));
1361 }
1362
1363
1364 /* ----------
1365 * pgstat_drop_relation() -
1366 *
1367 * Tell the collector that we just dropped a relation.
1368 * (If the message gets lost, we will still clean the dead entry eventually
1369 * via future invocations of pgstat_vacuum_stat().)
1370 *
1371 * Currently not used for lack of any good place to call it; we rely
1372 * entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
1373 * ----------
1374 */
1375 #ifdef NOT_USED
1376 void
pgstat_drop_relation(Oid relid)1377 pgstat_drop_relation(Oid relid)
1378 {
1379 PgStat_MsgTabpurge msg;
1380 int len;
1381
1382 if (pgStatSock == PGINVALID_SOCKET)
1383 return;
1384
1385 msg.m_tableid[0] = relid;
1386 msg.m_nentries = 1;
1387
1388 len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
1389
1390 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
1391 msg.m_databaseid = MyDatabaseId;
1392 pgstat_send(&msg, len);
1393 }
1394 #endif /* NOT_USED */
1395
1396 /* ----------
1397 * pgstat_reset_counters() -
1398 *
1399 * Tell the statistics collector to reset counters for our database.
1400 *
1401 * Permission checking for this function is managed through the normal
1402 * GRANT system.
1403 * ----------
1404 */
1405 void
pgstat_reset_counters(void)1406 pgstat_reset_counters(void)
1407 {
1408 PgStat_MsgResetcounter msg;
1409
1410 if (pgStatSock == PGINVALID_SOCKET)
1411 return;
1412
1413 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETCOUNTER);
1414 msg.m_databaseid = MyDatabaseId;
1415 pgstat_send(&msg, sizeof(msg));
1416 }
1417
1418 /* ----------
1419 * pgstat_reset_shared_counters() -
1420 *
1421 * Tell the statistics collector to reset cluster-wide shared counters.
1422 *
1423 * Permission checking for this function is managed through the normal
1424 * GRANT system.
1425 * ----------
1426 */
1427 void
pgstat_reset_shared_counters(const char * target)1428 pgstat_reset_shared_counters(const char *target)
1429 {
1430 PgStat_MsgResetsharedcounter msg;
1431
1432 if (pgStatSock == PGINVALID_SOCKET)
1433 return;
1434
1435 if (strcmp(target, "archiver") == 0)
1436 msg.m_resettarget = RESET_ARCHIVER;
1437 else if (strcmp(target, "bgwriter") == 0)
1438 msg.m_resettarget = RESET_BGWRITER;
1439 else if (strcmp(target, "wal") == 0)
1440 msg.m_resettarget = RESET_WAL;
1441 else
1442 ereport(ERROR,
1443 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1444 errmsg("unrecognized reset target: \"%s\"", target),
1445 errhint("Target must be \"archiver\", \"bgwriter\", or \"wal\".")));
1446
1447 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
1448 pgstat_send(&msg, sizeof(msg));
1449 }
1450
1451 /* ----------
1452 * pgstat_reset_single_counter() -
1453 *
1454 * Tell the statistics collector to reset a single counter.
1455 *
1456 * Permission checking for this function is managed through the normal
1457 * GRANT system.
1458 * ----------
1459 */
1460 void
pgstat_reset_single_counter(Oid objoid,PgStat_Single_Reset_Type type)1461 pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type)
1462 {
1463 PgStat_MsgResetsinglecounter msg;
1464
1465 if (pgStatSock == PGINVALID_SOCKET)
1466 return;
1467
1468 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSINGLECOUNTER);
1469 msg.m_databaseid = MyDatabaseId;
1470 msg.m_resettype = type;
1471 msg.m_objectid = objoid;
1472
1473 pgstat_send(&msg, sizeof(msg));
1474 }
1475
1476 /* ----------
1477 * pgstat_reset_slru_counter() -
1478 *
1479 * Tell the statistics collector to reset a single SLRU counter, or all
1480 * SLRU counters (when name is null).
1481 *
1482 * Permission checking for this function is managed through the normal
1483 * GRANT system.
1484 * ----------
1485 */
1486 void
pgstat_reset_slru_counter(const char * name)1487 pgstat_reset_slru_counter(const char *name)
1488 {
1489 PgStat_MsgResetslrucounter msg;
1490
1491 if (pgStatSock == PGINVALID_SOCKET)
1492 return;
1493
1494 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSLRUCOUNTER);
1495 msg.m_index = (name) ? pgstat_slru_index(name) : -1;
1496
1497 pgstat_send(&msg, sizeof(msg));
1498 }
1499
1500 /* ----------
1501 * pgstat_reset_replslot_counter() -
1502 *
1503 * Tell the statistics collector to reset a single replication slot
1504 * counter, or all replication slots counters (when name is null).
1505 *
1506 * Permission checking for this function is managed through the normal
1507 * GRANT system.
1508 * ----------
1509 */
1510 void
pgstat_reset_replslot_counter(const char * name)1511 pgstat_reset_replslot_counter(const char *name)
1512 {
1513 PgStat_MsgResetreplslotcounter msg;
1514
1515 if (pgStatSock == PGINVALID_SOCKET)
1516 return;
1517
1518 if (name)
1519 {
1520 namestrcpy(&msg.m_slotname, name);
1521 msg.clearall = false;
1522 }
1523 else
1524 msg.clearall = true;
1525
1526 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
1527
1528 pgstat_send(&msg, sizeof(msg));
1529 }
1530
1531 /* ----------
1532 * pgstat_report_autovac() -
1533 *
1534 * Called from autovacuum.c to report startup of an autovacuum process.
1535 * We are called before InitPostgres is done, so can't rely on MyDatabaseId;
1536 * the db OID must be passed in, instead.
1537 * ----------
1538 */
1539 void
pgstat_report_autovac(Oid dboid)1540 pgstat_report_autovac(Oid dboid)
1541 {
1542 PgStat_MsgAutovacStart msg;
1543
1544 if (pgStatSock == PGINVALID_SOCKET)
1545 return;
1546
1547 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_AUTOVAC_START);
1548 msg.m_databaseid = dboid;
1549 msg.m_start_time = GetCurrentTimestamp();
1550
1551 pgstat_send(&msg, sizeof(msg));
1552 }
1553
1554
1555 /* ---------
1556 * pgstat_report_vacuum() -
1557 *
1558 * Tell the collector about the table we just vacuumed.
1559 * ---------
1560 */
1561 void
pgstat_report_vacuum(Oid tableoid,bool shared,PgStat_Counter livetuples,PgStat_Counter deadtuples)1562 pgstat_report_vacuum(Oid tableoid, bool shared,
1563 PgStat_Counter livetuples, PgStat_Counter deadtuples)
1564 {
1565 PgStat_MsgVacuum msg;
1566
1567 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1568 return;
1569
1570 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_VACUUM);
1571 msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
1572 msg.m_tableoid = tableoid;
1573 msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1574 msg.m_vacuumtime = GetCurrentTimestamp();
1575 msg.m_live_tuples = livetuples;
1576 msg.m_dead_tuples = deadtuples;
1577 pgstat_send(&msg, sizeof(msg));
1578 }
1579
1580 /* --------
1581 * pgstat_report_analyze() -
1582 *
1583 * Tell the collector about the table we just analyzed.
1584 *
1585 * Caller must provide new live- and dead-tuples estimates, as well as a
1586 * flag indicating whether to reset the changes_since_analyze counter.
1587 * --------
1588 */
1589 void
pgstat_report_analyze(Relation rel,PgStat_Counter livetuples,PgStat_Counter deadtuples,bool resetcounter)1590 pgstat_report_analyze(Relation rel,
1591 PgStat_Counter livetuples, PgStat_Counter deadtuples,
1592 bool resetcounter)
1593 {
1594 PgStat_MsgAnalyze msg;
1595
1596 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1597 return;
1598
1599 /*
1600 * Unlike VACUUM, ANALYZE might be running inside a transaction that has
1601 * already inserted and/or deleted rows in the target table. ANALYZE will
1602 * have counted such rows as live or dead respectively. Because we will
1603 * report our counts of such rows at transaction end, we should subtract
1604 * off these counts from what we send to the collector now, else they'll
1605 * be double-counted after commit. (This approach also ensures that the
1606 * collector ends up with the right numbers if we abort instead of
1607 * committing.)
1608 *
1609 * Waste no time on partitioned tables, though.
1610 */
1611 if (rel->pgstat_info != NULL &&
1612 rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1613 {
1614 PgStat_TableXactStatus *trans;
1615
1616 for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
1617 {
1618 livetuples -= trans->tuples_inserted - trans->tuples_deleted;
1619 deadtuples -= trans->tuples_updated + trans->tuples_deleted;
1620 }
1621 /* count stuff inserted by already-aborted subxacts, too */
1622 deadtuples -= rel->pgstat_info->t_counts.t_delta_dead_tuples;
1623 /* Since ANALYZE's counts are estimates, we could have underflowed */
1624 livetuples = Max(livetuples, 0);
1625 deadtuples = Max(deadtuples, 0);
1626 }
1627
1628 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
1629 msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
1630 msg.m_tableoid = RelationGetRelid(rel);
1631 msg.m_autovacuum = IsAutoVacuumWorkerProcess();
1632 msg.m_resetcounter = resetcounter;
1633 msg.m_analyzetime = GetCurrentTimestamp();
1634 msg.m_live_tuples = livetuples;
1635 msg.m_dead_tuples = deadtuples;
1636 pgstat_send(&msg, sizeof(msg));
1637 }
1638
1639 /* --------
1640 * pgstat_report_recovery_conflict() -
1641 *
1642 * Tell the collector about a Hot Standby recovery conflict.
1643 * --------
1644 */
1645 void
pgstat_report_recovery_conflict(int reason)1646 pgstat_report_recovery_conflict(int reason)
1647 {
1648 PgStat_MsgRecoveryConflict msg;
1649
1650 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1651 return;
1652
1653 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYCONFLICT);
1654 msg.m_databaseid = MyDatabaseId;
1655 msg.m_reason = reason;
1656 pgstat_send(&msg, sizeof(msg));
1657 }
1658
1659 /* --------
1660 * pgstat_report_deadlock() -
1661 *
1662 * Tell the collector about a deadlock detected.
1663 * --------
1664 */
1665 void
pgstat_report_deadlock(void)1666 pgstat_report_deadlock(void)
1667 {
1668 PgStat_MsgDeadlock msg;
1669
1670 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1671 return;
1672
1673 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DEADLOCK);
1674 msg.m_databaseid = MyDatabaseId;
1675 pgstat_send(&msg, sizeof(msg));
1676 }
1677
1678
1679
1680 /* --------
1681 * pgstat_report_checksum_failures_in_db() -
1682 *
1683 * Tell the collector about one or more checksum failures.
1684 * --------
1685 */
1686 void
pgstat_report_checksum_failures_in_db(Oid dboid,int failurecount)1687 pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount)
1688 {
1689 PgStat_MsgChecksumFailure msg;
1690
1691 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1692 return;
1693
1694 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_CHECKSUMFAILURE);
1695 msg.m_databaseid = dboid;
1696 msg.m_failurecount = failurecount;
1697 msg.m_failure_time = GetCurrentTimestamp();
1698
1699 pgstat_send(&msg, sizeof(msg));
1700 }
1701
1702 /* --------
1703 * pgstat_report_checksum_failure() -
1704 *
1705 * Tell the collector about a checksum failure.
1706 * --------
1707 */
1708 void
pgstat_report_checksum_failure(void)1709 pgstat_report_checksum_failure(void)
1710 {
1711 pgstat_report_checksum_failures_in_db(MyDatabaseId, 1);
1712 }
1713
1714 /* --------
1715 * pgstat_report_tempfile() -
1716 *
1717 * Tell the collector about a temporary file.
1718 * --------
1719 */
1720 void
pgstat_report_tempfile(size_t filesize)1721 pgstat_report_tempfile(size_t filesize)
1722 {
1723 PgStat_MsgTempFile msg;
1724
1725 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
1726 return;
1727
1728 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TEMPFILE);
1729 msg.m_databaseid = MyDatabaseId;
1730 msg.m_filesize = filesize;
1731 pgstat_send(&msg, sizeof(msg));
1732 }
1733
1734 /* --------
1735 * pgstat_report_connect() -
1736 *
1737 * Tell the collector about a new connection.
1738 * --------
1739 */
1740 void
pgstat_report_connect(Oid dboid)1741 pgstat_report_connect(Oid dboid)
1742 {
1743 PgStat_MsgConnect msg;
1744
1745 if (!pgstat_should_report_connstat())
1746 return;
1747
1748 pgLastSessionReportTime = MyStartTimestamp;
1749
1750 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_CONNECT);
1751 msg.m_databaseid = MyDatabaseId;
1752 pgstat_send(&msg, sizeof(PgStat_MsgConnect));
1753 }
1754
1755 /* --------
1756 * pgstat_report_disconnect() -
1757 *
1758 * Tell the collector about a disconnect.
1759 * --------
1760 */
1761 static void
pgstat_report_disconnect(Oid dboid)1762 pgstat_report_disconnect(Oid dboid)
1763 {
1764 PgStat_MsgDisconnect msg;
1765
1766 if (!pgstat_should_report_connstat())
1767 return;
1768
1769 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DISCONNECT);
1770 msg.m_databaseid = MyDatabaseId;
1771 msg.m_cause = pgStatSessionEndCause;
1772 pgstat_send(&msg, sizeof(PgStat_MsgDisconnect));
1773 }
1774
1775 /* --------
1776 * pgstat_should_report_connstats() -
1777 *
1778 * We report session statistics only for normal backend processes. Parallel
1779 * workers run in parallel, so they don't contribute to session times, even
1780 * though they use CPU time. Walsender processes could be considered here,
1781 * but they have different session characteristics from normal backends (for
1782 * example, they are always "active"), so they would skew session statistics.
1783 * ----------
1784 */
1785 static bool
pgstat_should_report_connstat(void)1786 pgstat_should_report_connstat(void)
1787 {
1788 return MyBackendType == B_BACKEND;
1789 }
1790
1791 /* ----------
1792 * pgstat_report_replslot() -
1793 *
1794 * Tell the collector about replication slot statistics.
1795 * ----------
1796 */
1797 void
pgstat_report_replslot(const PgStat_StatReplSlotEntry * repSlotStat)1798 pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
1799 {
1800 PgStat_MsgReplSlot msg;
1801
1802 /*
1803 * Prepare and send the message
1804 */
1805 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1806 namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
1807 msg.m_create = false;
1808 msg.m_drop = false;
1809 msg.m_spill_txns = repSlotStat->spill_txns;
1810 msg.m_spill_count = repSlotStat->spill_count;
1811 msg.m_spill_bytes = repSlotStat->spill_bytes;
1812 msg.m_stream_txns = repSlotStat->stream_txns;
1813 msg.m_stream_count = repSlotStat->stream_count;
1814 msg.m_stream_bytes = repSlotStat->stream_bytes;
1815 msg.m_total_txns = repSlotStat->total_txns;
1816 msg.m_total_bytes = repSlotStat->total_bytes;
1817 pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1818 }
1819
1820 /* ----------
1821 * pgstat_report_replslot_create() -
1822 *
1823 * Tell the collector about creating the replication slot.
1824 * ----------
1825 */
1826 void
pgstat_report_replslot_create(const char * slotname)1827 pgstat_report_replslot_create(const char *slotname)
1828 {
1829 PgStat_MsgReplSlot msg;
1830
1831 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1832 namestrcpy(&msg.m_slotname, slotname);
1833 msg.m_create = true;
1834 msg.m_drop = false;
1835 pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1836 }
1837
1838 /* ----------
1839 * pgstat_report_replslot_drop() -
1840 *
1841 * Tell the collector about dropping the replication slot.
1842 * ----------
1843 */
1844 void
pgstat_report_replslot_drop(const char * slotname)1845 pgstat_report_replslot_drop(const char *slotname)
1846 {
1847 PgStat_MsgReplSlot msg;
1848
1849 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
1850 namestrcpy(&msg.m_slotname, slotname);
1851 msg.m_create = false;
1852 msg.m_drop = true;
1853 pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
1854 }
1855
1856 /* ----------
1857 * pgstat_ping() -
1858 *
1859 * Send some junk data to the collector to increase traffic.
1860 * ----------
1861 */
1862 void
pgstat_ping(void)1863 pgstat_ping(void)
1864 {
1865 PgStat_MsgDummy msg;
1866
1867 if (pgStatSock == PGINVALID_SOCKET)
1868 return;
1869
1870 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_DUMMY);
1871 pgstat_send(&msg, sizeof(msg));
1872 }
1873
1874 /* ----------
1875 * pgstat_send_inquiry() -
1876 *
1877 * Notify collector that we need fresh data.
1878 * ----------
1879 */
1880 static void
pgstat_send_inquiry(TimestampTz clock_time,TimestampTz cutoff_time,Oid databaseid)1881 pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
1882 {
1883 PgStat_MsgInquiry msg;
1884
1885 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
1886 msg.clock_time = clock_time;
1887 msg.cutoff_time = cutoff_time;
1888 msg.databaseid = databaseid;
1889 pgstat_send(&msg, sizeof(msg));
1890 }
1891
1892
1893 /*
1894 * Initialize function call usage data.
1895 * Called by the executor before invoking a function.
1896 */
1897 void
pgstat_init_function_usage(FunctionCallInfo fcinfo,PgStat_FunctionCallUsage * fcu)1898 pgstat_init_function_usage(FunctionCallInfo fcinfo,
1899 PgStat_FunctionCallUsage *fcu)
1900 {
1901 PgStat_BackendFunctionEntry *htabent;
1902 bool found;
1903
1904 if (pgstat_track_functions <= fcinfo->flinfo->fn_stats)
1905 {
1906 /* stats not wanted */
1907 fcu->fs = NULL;
1908 return;
1909 }
1910
1911 if (!pgStatFunctions)
1912 {
1913 /* First time through - initialize function stat table */
1914 HASHCTL hash_ctl;
1915
1916 hash_ctl.keysize = sizeof(Oid);
1917 hash_ctl.entrysize = sizeof(PgStat_BackendFunctionEntry);
1918 pgStatFunctions = hash_create("Function stat entries",
1919 PGSTAT_FUNCTION_HASH_SIZE,
1920 &hash_ctl,
1921 HASH_ELEM | HASH_BLOBS);
1922 }
1923
1924 /* Get the stats entry for this function, create if necessary */
1925 htabent = hash_search(pgStatFunctions, &fcinfo->flinfo->fn_oid,
1926 HASH_ENTER, &found);
1927 if (!found)
1928 MemSet(&htabent->f_counts, 0, sizeof(PgStat_FunctionCounts));
1929
1930 fcu->fs = &htabent->f_counts;
1931
1932 /* save stats for this function, later used to compensate for recursion */
1933 fcu->save_f_total_time = htabent->f_counts.f_total_time;
1934
1935 /* save current backend-wide total time */
1936 fcu->save_total = total_func_time;
1937
1938 /* get clock time as of function start */
1939 INSTR_TIME_SET_CURRENT(fcu->f_start);
1940 }
1941
1942 /*
1943 * find_funcstat_entry - find any existing PgStat_BackendFunctionEntry entry
1944 * for specified function
1945 *
1946 * If no entry, return NULL, don't create a new one
1947 */
1948 PgStat_BackendFunctionEntry *
find_funcstat_entry(Oid func_id)1949 find_funcstat_entry(Oid func_id)
1950 {
1951 if (pgStatFunctions == NULL)
1952 return NULL;
1953
1954 return (PgStat_BackendFunctionEntry *) hash_search(pgStatFunctions,
1955 (void *) &func_id,
1956 HASH_FIND, NULL);
1957 }
1958
1959 /*
1960 * Calculate function call usage and update stat counters.
1961 * Called by the executor after invoking a function.
1962 *
1963 * In the case of a set-returning function that runs in value-per-call mode,
1964 * we will see multiple pgstat_init_function_usage/pgstat_end_function_usage
1965 * calls for what the user considers a single call of the function. The
1966 * finalize flag should be TRUE on the last call.
1967 */
1968 void
pgstat_end_function_usage(PgStat_FunctionCallUsage * fcu,bool finalize)1969 pgstat_end_function_usage(PgStat_FunctionCallUsage *fcu, bool finalize)
1970 {
1971 PgStat_FunctionCounts *fs = fcu->fs;
1972 instr_time f_total;
1973 instr_time f_others;
1974 instr_time f_self;
1975
1976 /* stats not wanted? */
1977 if (fs == NULL)
1978 return;
1979
1980 /* total elapsed time in this function call */
1981 INSTR_TIME_SET_CURRENT(f_total);
1982 INSTR_TIME_SUBTRACT(f_total, fcu->f_start);
1983
1984 /* self usage: elapsed minus anything already charged to other calls */
1985 f_others = total_func_time;
1986 INSTR_TIME_SUBTRACT(f_others, fcu->save_total);
1987 f_self = f_total;
1988 INSTR_TIME_SUBTRACT(f_self, f_others);
1989
1990 /* update backend-wide total time */
1991 INSTR_TIME_ADD(total_func_time, f_self);
1992
1993 /*
1994 * Compute the new f_total_time as the total elapsed time added to the
1995 * pre-call value of f_total_time. This is necessary to avoid
1996 * double-counting any time taken by recursive calls of myself. (We do
1997 * not need any similar kluge for self time, since that already excludes
1998 * any recursive calls.)
1999 */
2000 INSTR_TIME_ADD(f_total, fcu->save_f_total_time);
2001
2002 /* update counters in function stats table */
2003 if (finalize)
2004 fs->f_numcalls++;
2005 fs->f_total_time = f_total;
2006 INSTR_TIME_ADD(fs->f_self_time, f_self);
2007
2008 /* indicate that we have something to send */
2009 have_function_stats = true;
2010 }
2011
2012
2013 /* ----------
2014 * pgstat_initstats() -
2015 *
2016 * Initialize a relcache entry to count access statistics.
2017 * Called whenever a relation is opened.
2018 *
2019 * We assume that a relcache entry's pgstat_info field is zeroed by
2020 * relcache.c when the relcache entry is made; thereafter it is long-lived
2021 * data. We can avoid repeated searches of the TabStatus arrays when the
2022 * same relation is touched repeatedly within a transaction.
2023 * ----------
2024 */
2025 void
pgstat_initstats(Relation rel)2026 pgstat_initstats(Relation rel)
2027 {
2028 Oid rel_id = rel->rd_id;
2029 char relkind = rel->rd_rel->relkind;
2030
2031 /*
2032 * We only count stats for relations with storage and partitioned tables
2033 */
2034 if (!RELKIND_HAS_STORAGE(relkind) && relkind != RELKIND_PARTITIONED_TABLE)
2035 {
2036 rel->pgstat_info = NULL;
2037 return;
2038 }
2039
2040 if (pgStatSock == PGINVALID_SOCKET || !pgstat_track_counts)
2041 {
2042 /* We're not counting at all */
2043 rel->pgstat_info = NULL;
2044 return;
2045 }
2046
2047 /*
2048 * If we already set up this relation in the current transaction, nothing
2049 * to do.
2050 */
2051 if (rel->pgstat_info != NULL &&
2052 rel->pgstat_info->t_id == rel_id)
2053 return;
2054
2055 /* Else find or make the PgStat_TableStatus entry, and update link */
2056 rel->pgstat_info = get_tabstat_entry(rel_id, rel->rd_rel->relisshared);
2057 }
2058
2059 /*
2060 * get_tabstat_entry - find or create a PgStat_TableStatus entry for rel
2061 */
2062 static PgStat_TableStatus *
get_tabstat_entry(Oid rel_id,bool isshared)2063 get_tabstat_entry(Oid rel_id, bool isshared)
2064 {
2065 TabStatHashEntry *hash_entry;
2066 PgStat_TableStatus *entry;
2067 TabStatusArray *tsa;
2068 bool found;
2069
2070 /*
2071 * Create hash table if we don't have it already.
2072 */
2073 if (pgStatTabHash == NULL)
2074 {
2075 HASHCTL ctl;
2076
2077 ctl.keysize = sizeof(Oid);
2078 ctl.entrysize = sizeof(TabStatHashEntry);
2079
2080 pgStatTabHash = hash_create("pgstat TabStatusArray lookup hash table",
2081 TABSTAT_QUANTUM,
2082 &ctl,
2083 HASH_ELEM | HASH_BLOBS);
2084 }
2085
2086 /*
2087 * Find an entry or create a new one.
2088 */
2089 hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_ENTER, &found);
2090 if (!found)
2091 {
2092 /* initialize new entry with null pointer */
2093 hash_entry->tsa_entry = NULL;
2094 }
2095
2096 /*
2097 * If entry is already valid, we're done.
2098 */
2099 if (hash_entry->tsa_entry)
2100 return hash_entry->tsa_entry;
2101
2102 /*
2103 * Locate the first pgStatTabList entry with free space, making a new list
2104 * entry if needed. Note that we could get an OOM failure here, but if so
2105 * we have left the hashtable and the list in a consistent state.
2106 */
2107 if (pgStatTabList == NULL)
2108 {
2109 /* Set up first pgStatTabList entry */
2110 pgStatTabList = (TabStatusArray *)
2111 MemoryContextAllocZero(TopMemoryContext,
2112 sizeof(TabStatusArray));
2113 }
2114
2115 tsa = pgStatTabList;
2116 while (tsa->tsa_used >= TABSTAT_QUANTUM)
2117 {
2118 if (tsa->tsa_next == NULL)
2119 tsa->tsa_next = (TabStatusArray *)
2120 MemoryContextAllocZero(TopMemoryContext,
2121 sizeof(TabStatusArray));
2122 tsa = tsa->tsa_next;
2123 }
2124
2125 /*
2126 * Allocate a PgStat_TableStatus entry within this list entry. We assume
2127 * the entry was already zeroed, either at creation or after last use.
2128 */
2129 entry = &tsa->tsa_entries[tsa->tsa_used++];
2130 entry->t_id = rel_id;
2131 entry->t_shared = isshared;
2132
2133 /*
2134 * Now we can fill the entry in pgStatTabHash.
2135 */
2136 hash_entry->tsa_entry = entry;
2137
2138 return entry;
2139 }
2140
2141 /*
2142 * find_tabstat_entry - find any existing PgStat_TableStatus entry for rel
2143 *
2144 * If no entry, return NULL, don't create a new one
2145 *
2146 * Note: if we got an error in the most recent execution of pgstat_report_stat,
2147 * it's possible that an entry exists but there's no hashtable entry for it.
2148 * That's okay, we'll treat this case as "doesn't exist".
2149 */
2150 PgStat_TableStatus *
find_tabstat_entry(Oid rel_id)2151 find_tabstat_entry(Oid rel_id)
2152 {
2153 TabStatHashEntry *hash_entry;
2154
2155 /* If hashtable doesn't exist, there are no entries at all */
2156 if (!pgStatTabHash)
2157 return NULL;
2158
2159 hash_entry = hash_search(pgStatTabHash, &rel_id, HASH_FIND, NULL);
2160 if (!hash_entry)
2161 return NULL;
2162
2163 /* Note that this step could also return NULL, but that's correct */
2164 return hash_entry->tsa_entry;
2165 }
2166
2167 /*
2168 * get_tabstat_stack_level - add a new (sub)transaction stack entry if needed
2169 */
2170 static PgStat_SubXactStatus *
get_tabstat_stack_level(int nest_level)2171 get_tabstat_stack_level(int nest_level)
2172 {
2173 PgStat_SubXactStatus *xact_state;
2174
2175 xact_state = pgStatXactStack;
2176 if (xact_state == NULL || xact_state->nest_level != nest_level)
2177 {
2178 xact_state = (PgStat_SubXactStatus *)
2179 MemoryContextAlloc(TopTransactionContext,
2180 sizeof(PgStat_SubXactStatus));
2181 xact_state->nest_level = nest_level;
2182 xact_state->prev = pgStatXactStack;
2183 xact_state->first = NULL;
2184 pgStatXactStack = xact_state;
2185 }
2186 return xact_state;
2187 }
2188
2189 /*
2190 * add_tabstat_xact_level - add a new (sub)transaction state record
2191 */
2192 static void
add_tabstat_xact_level(PgStat_TableStatus * pgstat_info,int nest_level)2193 add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
2194 {
2195 PgStat_SubXactStatus *xact_state;
2196 PgStat_TableXactStatus *trans;
2197
2198 /*
2199 * If this is the first rel to be modified at the current nest level, we
2200 * first have to push a transaction stack entry.
2201 */
2202 xact_state = get_tabstat_stack_level(nest_level);
2203
2204 /* Now make a per-table stack entry */
2205 trans = (PgStat_TableXactStatus *)
2206 MemoryContextAllocZero(TopTransactionContext,
2207 sizeof(PgStat_TableXactStatus));
2208 trans->nest_level = nest_level;
2209 trans->upper = pgstat_info->trans;
2210 trans->parent = pgstat_info;
2211 trans->next = xact_state->first;
2212 xact_state->first = trans;
2213 pgstat_info->trans = trans;
2214 }
2215
2216 /*
2217 * pgstat_count_heap_insert - count a tuple insertion of n tuples
2218 */
2219 void
pgstat_count_heap_insert(Relation rel,PgStat_Counter n)2220 pgstat_count_heap_insert(Relation rel, PgStat_Counter n)
2221 {
2222 PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2223
2224 if (pgstat_info != NULL)
2225 {
2226 /* We have to log the effect at the proper transactional level */
2227 int nest_level = GetCurrentTransactionNestLevel();
2228
2229 if (pgstat_info->trans == NULL ||
2230 pgstat_info->trans->nest_level != nest_level)
2231 add_tabstat_xact_level(pgstat_info, nest_level);
2232
2233 pgstat_info->trans->tuples_inserted += n;
2234 }
2235 }
2236
2237 /*
2238 * pgstat_count_heap_update - count a tuple update
2239 */
2240 void
pgstat_count_heap_update(Relation rel,bool hot)2241 pgstat_count_heap_update(Relation rel, bool hot)
2242 {
2243 PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2244
2245 if (pgstat_info != NULL)
2246 {
2247 /* We have to log the effect at the proper transactional level */
2248 int nest_level = GetCurrentTransactionNestLevel();
2249
2250 if (pgstat_info->trans == NULL ||
2251 pgstat_info->trans->nest_level != nest_level)
2252 add_tabstat_xact_level(pgstat_info, nest_level);
2253
2254 pgstat_info->trans->tuples_updated++;
2255
2256 /* t_tuples_hot_updated is nontransactional, so just advance it */
2257 if (hot)
2258 pgstat_info->t_counts.t_tuples_hot_updated++;
2259 }
2260 }
2261
2262 /*
2263 * pgstat_count_heap_delete - count a tuple deletion
2264 */
2265 void
pgstat_count_heap_delete(Relation rel)2266 pgstat_count_heap_delete(Relation rel)
2267 {
2268 PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2269
2270 if (pgstat_info != NULL)
2271 {
2272 /* We have to log the effect at the proper transactional level */
2273 int nest_level = GetCurrentTransactionNestLevel();
2274
2275 if (pgstat_info->trans == NULL ||
2276 pgstat_info->trans->nest_level != nest_level)
2277 add_tabstat_xact_level(pgstat_info, nest_level);
2278
2279 pgstat_info->trans->tuples_deleted++;
2280 }
2281 }
2282
2283 /*
2284 * pgstat_truncate_save_counters
2285 *
2286 * Whenever a table is truncated, we save its i/u/d counters so that they can
2287 * be cleared, and if the (sub)xact that executed the truncate later aborts,
2288 * the counters can be restored to the saved (pre-truncate) values. Note we do
2289 * this on the first truncate in any particular subxact level only.
2290 */
2291 static void
pgstat_truncate_save_counters(PgStat_TableXactStatus * trans)2292 pgstat_truncate_save_counters(PgStat_TableXactStatus *trans)
2293 {
2294 if (!trans->truncated)
2295 {
2296 trans->inserted_pre_trunc = trans->tuples_inserted;
2297 trans->updated_pre_trunc = trans->tuples_updated;
2298 trans->deleted_pre_trunc = trans->tuples_deleted;
2299 trans->truncated = true;
2300 }
2301 }
2302
2303 /*
2304 * pgstat_truncate_restore_counters - restore counters when a truncate aborts
2305 */
2306 static void
pgstat_truncate_restore_counters(PgStat_TableXactStatus * trans)2307 pgstat_truncate_restore_counters(PgStat_TableXactStatus *trans)
2308 {
2309 if (trans->truncated)
2310 {
2311 trans->tuples_inserted = trans->inserted_pre_trunc;
2312 trans->tuples_updated = trans->updated_pre_trunc;
2313 trans->tuples_deleted = trans->deleted_pre_trunc;
2314 }
2315 }
2316
2317 /*
2318 * pgstat_count_truncate - update tuple counters due to truncate
2319 */
2320 void
pgstat_count_truncate(Relation rel)2321 pgstat_count_truncate(Relation rel)
2322 {
2323 PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2324
2325 if (pgstat_info != NULL)
2326 {
2327 /* We have to log the effect at the proper transactional level */
2328 int nest_level = GetCurrentTransactionNestLevel();
2329
2330 if (pgstat_info->trans == NULL ||
2331 pgstat_info->trans->nest_level != nest_level)
2332 add_tabstat_xact_level(pgstat_info, nest_level);
2333
2334 pgstat_truncate_save_counters(pgstat_info->trans);
2335 pgstat_info->trans->tuples_inserted = 0;
2336 pgstat_info->trans->tuples_updated = 0;
2337 pgstat_info->trans->tuples_deleted = 0;
2338 }
2339 }
2340
2341 /*
2342 * pgstat_update_heap_dead_tuples - update dead-tuples count
2343 *
2344 * The semantics of this are that we are reporting the nontransactional
2345 * recovery of "delta" dead tuples; so t_delta_dead_tuples decreases
2346 * rather than increasing, and the change goes straight into the per-table
2347 * counter, not into transactional state.
2348 */
2349 void
pgstat_update_heap_dead_tuples(Relation rel,int delta)2350 pgstat_update_heap_dead_tuples(Relation rel, int delta)
2351 {
2352 PgStat_TableStatus *pgstat_info = rel->pgstat_info;
2353
2354 if (pgstat_info != NULL)
2355 pgstat_info->t_counts.t_delta_dead_tuples -= delta;
2356 }
2357
2358
2359 /* ----------
2360 * AtEOXact_PgStat
2361 *
2362 * Called from access/transam/xact.c at top-level transaction commit/abort.
2363 * ----------
2364 */
2365 void
AtEOXact_PgStat(bool isCommit,bool parallel)2366 AtEOXact_PgStat(bool isCommit, bool parallel)
2367 {
2368 PgStat_SubXactStatus *xact_state;
2369
2370 /* Don't count parallel worker transaction stats */
2371 if (!parallel)
2372 {
2373 /*
2374 * Count transaction commit or abort. (We use counters, not just
2375 * bools, in case the reporting message isn't sent right away.)
2376 */
2377 if (isCommit)
2378 pgStatXactCommit++;
2379 else
2380 pgStatXactRollback++;
2381 }
2382
2383 /*
2384 * Transfer transactional insert/update counts into the base tabstat
2385 * entries. We don't bother to free any of the transactional state, since
2386 * it's all in TopTransactionContext and will go away anyway.
2387 */
2388 xact_state = pgStatXactStack;
2389 if (xact_state != NULL)
2390 {
2391 PgStat_TableXactStatus *trans;
2392
2393 Assert(xact_state->nest_level == 1);
2394 Assert(xact_state->prev == NULL);
2395 for (trans = xact_state->first; trans != NULL; trans = trans->next)
2396 {
2397 PgStat_TableStatus *tabstat;
2398
2399 Assert(trans->nest_level == 1);
2400 Assert(trans->upper == NULL);
2401 tabstat = trans->parent;
2402 Assert(tabstat->trans == trans);
2403 /* restore pre-truncate stats (if any) in case of aborted xact */
2404 if (!isCommit)
2405 pgstat_truncate_restore_counters(trans);
2406 /* count attempted actions regardless of commit/abort */
2407 tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2408 tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2409 tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2410 if (isCommit)
2411 {
2412 tabstat->t_counts.t_truncated = trans->truncated;
2413 if (trans->truncated)
2414 {
2415 /* forget live/dead stats seen by backend thus far */
2416 tabstat->t_counts.t_delta_live_tuples = 0;
2417 tabstat->t_counts.t_delta_dead_tuples = 0;
2418 }
2419 /* insert adds a live tuple, delete removes one */
2420 tabstat->t_counts.t_delta_live_tuples +=
2421 trans->tuples_inserted - trans->tuples_deleted;
2422 /* update and delete each create a dead tuple */
2423 tabstat->t_counts.t_delta_dead_tuples +=
2424 trans->tuples_updated + trans->tuples_deleted;
2425 /* insert, update, delete each count as one change event */
2426 tabstat->t_counts.t_changed_tuples +=
2427 trans->tuples_inserted + trans->tuples_updated +
2428 trans->tuples_deleted;
2429 }
2430 else
2431 {
2432 /* inserted tuples are dead, deleted tuples are unaffected */
2433 tabstat->t_counts.t_delta_dead_tuples +=
2434 trans->tuples_inserted + trans->tuples_updated;
2435 /* an aborted xact generates no changed_tuple events */
2436 }
2437 tabstat->trans = NULL;
2438 }
2439 }
2440 pgStatXactStack = NULL;
2441
2442 /* Make sure any stats snapshot is thrown away */
2443 pgstat_clear_snapshot();
2444 }
2445
2446 /* ----------
2447 * AtEOSubXact_PgStat
2448 *
2449 * Called from access/transam/xact.c at subtransaction commit/abort.
2450 * ----------
2451 */
2452 void
AtEOSubXact_PgStat(bool isCommit,int nestDepth)2453 AtEOSubXact_PgStat(bool isCommit, int nestDepth)
2454 {
2455 PgStat_SubXactStatus *xact_state;
2456
2457 /*
2458 * Transfer transactional insert/update counts into the next higher
2459 * subtransaction state.
2460 */
2461 xact_state = pgStatXactStack;
2462 if (xact_state != NULL &&
2463 xact_state->nest_level >= nestDepth)
2464 {
2465 PgStat_TableXactStatus *trans;
2466 PgStat_TableXactStatus *next_trans;
2467
2468 /* delink xact_state from stack immediately to simplify reuse case */
2469 pgStatXactStack = xact_state->prev;
2470
2471 for (trans = xact_state->first; trans != NULL; trans = next_trans)
2472 {
2473 PgStat_TableStatus *tabstat;
2474
2475 next_trans = trans->next;
2476 Assert(trans->nest_level == nestDepth);
2477 tabstat = trans->parent;
2478 Assert(tabstat->trans == trans);
2479 if (isCommit)
2480 {
2481 if (trans->upper && trans->upper->nest_level == nestDepth - 1)
2482 {
2483 if (trans->truncated)
2484 {
2485 /* propagate the truncate status one level up */
2486 pgstat_truncate_save_counters(trans->upper);
2487 /* replace upper xact stats with ours */
2488 trans->upper->tuples_inserted = trans->tuples_inserted;
2489 trans->upper->tuples_updated = trans->tuples_updated;
2490 trans->upper->tuples_deleted = trans->tuples_deleted;
2491 }
2492 else
2493 {
2494 trans->upper->tuples_inserted += trans->tuples_inserted;
2495 trans->upper->tuples_updated += trans->tuples_updated;
2496 trans->upper->tuples_deleted += trans->tuples_deleted;
2497 }
2498 tabstat->trans = trans->upper;
2499 pfree(trans);
2500 }
2501 else
2502 {
2503 /*
2504 * When there isn't an immediate parent state, we can just
2505 * reuse the record instead of going through a
2506 * palloc/pfree pushup (this works since it's all in
2507 * TopTransactionContext anyway). We have to re-link it
2508 * into the parent level, though, and that might mean
2509 * pushing a new entry into the pgStatXactStack.
2510 */
2511 PgStat_SubXactStatus *upper_xact_state;
2512
2513 upper_xact_state = get_tabstat_stack_level(nestDepth - 1);
2514 trans->next = upper_xact_state->first;
2515 upper_xact_state->first = trans;
2516 trans->nest_level = nestDepth - 1;
2517 }
2518 }
2519 else
2520 {
2521 /*
2522 * On abort, update top-level tabstat counts, then forget the
2523 * subtransaction
2524 */
2525
2526 /* first restore values obliterated by truncate */
2527 pgstat_truncate_restore_counters(trans);
2528 /* count attempted actions regardless of commit/abort */
2529 tabstat->t_counts.t_tuples_inserted += trans->tuples_inserted;
2530 tabstat->t_counts.t_tuples_updated += trans->tuples_updated;
2531 tabstat->t_counts.t_tuples_deleted += trans->tuples_deleted;
2532 /* inserted tuples are dead, deleted tuples are unaffected */
2533 tabstat->t_counts.t_delta_dead_tuples +=
2534 trans->tuples_inserted + trans->tuples_updated;
2535 tabstat->trans = trans->upper;
2536 pfree(trans);
2537 }
2538 }
2539 pfree(xact_state);
2540 }
2541 }
2542
2543
2544 /*
2545 * AtPrepare_PgStat
2546 * Save the transactional stats state at 2PC transaction prepare.
2547 *
2548 * In this phase we just generate 2PC records for all the pending
2549 * transaction-dependent stats work.
2550 */
2551 void
AtPrepare_PgStat(void)2552 AtPrepare_PgStat(void)
2553 {
2554 PgStat_SubXactStatus *xact_state;
2555
2556 xact_state = pgStatXactStack;
2557 if (xact_state != NULL)
2558 {
2559 PgStat_TableXactStatus *trans;
2560
2561 Assert(xact_state->nest_level == 1);
2562 Assert(xact_state->prev == NULL);
2563 for (trans = xact_state->first; trans != NULL; trans = trans->next)
2564 {
2565 PgStat_TableStatus *tabstat;
2566 TwoPhasePgStatRecord record;
2567
2568 Assert(trans->nest_level == 1);
2569 Assert(trans->upper == NULL);
2570 tabstat = trans->parent;
2571 Assert(tabstat->trans == trans);
2572
2573 record.tuples_inserted = trans->tuples_inserted;
2574 record.tuples_updated = trans->tuples_updated;
2575 record.tuples_deleted = trans->tuples_deleted;
2576 record.inserted_pre_trunc = trans->inserted_pre_trunc;
2577 record.updated_pre_trunc = trans->updated_pre_trunc;
2578 record.deleted_pre_trunc = trans->deleted_pre_trunc;
2579 record.t_id = tabstat->t_id;
2580 record.t_shared = tabstat->t_shared;
2581 record.t_truncated = trans->truncated;
2582
2583 RegisterTwoPhaseRecord(TWOPHASE_RM_PGSTAT_ID, 0,
2584 &record, sizeof(TwoPhasePgStatRecord));
2585 }
2586 }
2587 }
2588
2589 /*
2590 * PostPrepare_PgStat
2591 * Clean up after successful PREPARE.
2592 *
2593 * All we need do here is unlink the transaction stats state from the
2594 * nontransactional state. The nontransactional action counts will be
2595 * reported to the stats collector immediately, while the effects on live
2596 * and dead tuple counts are preserved in the 2PC state file.
2597 *
2598 * Note: AtEOXact_PgStat is not called during PREPARE.
2599 */
2600 void
PostPrepare_PgStat(void)2601 PostPrepare_PgStat(void)
2602 {
2603 PgStat_SubXactStatus *xact_state;
2604
2605 /*
2606 * We don't bother to free any of the transactional state, since it's all
2607 * in TopTransactionContext and will go away anyway.
2608 */
2609 xact_state = pgStatXactStack;
2610 if (xact_state != NULL)
2611 {
2612 PgStat_TableXactStatus *trans;
2613
2614 for (trans = xact_state->first; trans != NULL; trans = trans->next)
2615 {
2616 PgStat_TableStatus *tabstat;
2617
2618 tabstat = trans->parent;
2619 tabstat->trans = NULL;
2620 }
2621 }
2622 pgStatXactStack = NULL;
2623
2624 /* Make sure any stats snapshot is thrown away */
2625 pgstat_clear_snapshot();
2626 }
2627
2628 /*
2629 * 2PC processing routine for COMMIT PREPARED case.
2630 *
2631 * Load the saved counts into our local pgstats state.
2632 */
2633 void
pgstat_twophase_postcommit(TransactionId xid,uint16 info,void * recdata,uint32 len)2634 pgstat_twophase_postcommit(TransactionId xid, uint16 info,
2635 void *recdata, uint32 len)
2636 {
2637 TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2638 PgStat_TableStatus *pgstat_info;
2639
2640 /* Find or create a tabstat entry for the rel */
2641 pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2642
2643 /* Same math as in AtEOXact_PgStat, commit case */
2644 pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2645 pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2646 pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2647 pgstat_info->t_counts.t_truncated = rec->t_truncated;
2648 if (rec->t_truncated)
2649 {
2650 /* forget live/dead stats seen by backend thus far */
2651 pgstat_info->t_counts.t_delta_live_tuples = 0;
2652 pgstat_info->t_counts.t_delta_dead_tuples = 0;
2653 }
2654 pgstat_info->t_counts.t_delta_live_tuples +=
2655 rec->tuples_inserted - rec->tuples_deleted;
2656 pgstat_info->t_counts.t_delta_dead_tuples +=
2657 rec->tuples_updated + rec->tuples_deleted;
2658 pgstat_info->t_counts.t_changed_tuples +=
2659 rec->tuples_inserted + rec->tuples_updated +
2660 rec->tuples_deleted;
2661 }
2662
2663 /*
2664 * 2PC processing routine for ROLLBACK PREPARED case.
2665 *
2666 * Load the saved counts into our local pgstats state, but treat them
2667 * as aborted.
2668 */
2669 void
pgstat_twophase_postabort(TransactionId xid,uint16 info,void * recdata,uint32 len)2670 pgstat_twophase_postabort(TransactionId xid, uint16 info,
2671 void *recdata, uint32 len)
2672 {
2673 TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
2674 PgStat_TableStatus *pgstat_info;
2675
2676 /* Find or create a tabstat entry for the rel */
2677 pgstat_info = get_tabstat_entry(rec->t_id, rec->t_shared);
2678
2679 /* Same math as in AtEOXact_PgStat, abort case */
2680 if (rec->t_truncated)
2681 {
2682 rec->tuples_inserted = rec->inserted_pre_trunc;
2683 rec->tuples_updated = rec->updated_pre_trunc;
2684 rec->tuples_deleted = rec->deleted_pre_trunc;
2685 }
2686 pgstat_info->t_counts.t_tuples_inserted += rec->tuples_inserted;
2687 pgstat_info->t_counts.t_tuples_updated += rec->tuples_updated;
2688 pgstat_info->t_counts.t_tuples_deleted += rec->tuples_deleted;
2689 pgstat_info->t_counts.t_delta_dead_tuples +=
2690 rec->tuples_inserted + rec->tuples_updated;
2691 }
2692
2693
2694 /* ----------
2695 * pgstat_fetch_stat_dbentry() -
2696 *
2697 * Support function for the SQL-callable pgstat* functions. Returns
2698 * the collected statistics for one database or NULL. NULL doesn't mean
2699 * that the database doesn't exist, it is just not yet known by the
2700 * collector, so the caller is better off to report ZERO instead.
2701 * ----------
2702 */
2703 PgStat_StatDBEntry *
pgstat_fetch_stat_dbentry(Oid dbid)2704 pgstat_fetch_stat_dbentry(Oid dbid)
2705 {
2706 /*
2707 * If not done for this transaction, read the statistics collector stats
2708 * file into some hash tables.
2709 */
2710 backend_read_statsfile();
2711
2712 /*
2713 * Lookup the requested database; return NULL if not found
2714 */
2715 return (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2716 (void *) &dbid,
2717 HASH_FIND, NULL);
2718 }
2719
2720
2721 /* ----------
2722 * pgstat_fetch_stat_tabentry() -
2723 *
2724 * Support function for the SQL-callable pgstat* functions. Returns
2725 * the collected statistics for one table or NULL. NULL doesn't mean
2726 * that the table doesn't exist, it is just not yet known by the
2727 * collector, so the caller is better off to report ZERO instead.
2728 * ----------
2729 */
2730 PgStat_StatTabEntry *
pgstat_fetch_stat_tabentry(Oid relid)2731 pgstat_fetch_stat_tabentry(Oid relid)
2732 {
2733 Oid dbid;
2734 PgStat_StatDBEntry *dbentry;
2735 PgStat_StatTabEntry *tabentry;
2736
2737 /*
2738 * If not done for this transaction, read the statistics collector stats
2739 * file into some hash tables.
2740 */
2741 backend_read_statsfile();
2742
2743 /*
2744 * Lookup our database, then look in its table hash table.
2745 */
2746 dbid = MyDatabaseId;
2747 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2748 (void *) &dbid,
2749 HASH_FIND, NULL);
2750 if (dbentry != NULL && dbentry->tables != NULL)
2751 {
2752 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2753 (void *) &relid,
2754 HASH_FIND, NULL);
2755 if (tabentry)
2756 return tabentry;
2757 }
2758
2759 /*
2760 * If we didn't find it, maybe it's a shared table.
2761 */
2762 dbid = InvalidOid;
2763 dbentry = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
2764 (void *) &dbid,
2765 HASH_FIND, NULL);
2766 if (dbentry != NULL && dbentry->tables != NULL)
2767 {
2768 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
2769 (void *) &relid,
2770 HASH_FIND, NULL);
2771 if (tabentry)
2772 return tabentry;
2773 }
2774
2775 return NULL;
2776 }
2777
2778
2779 /* ----------
2780 * pgstat_fetch_stat_funcentry() -
2781 *
2782 * Support function for the SQL-callable pgstat* functions. Returns
2783 * the collected statistics for one function or NULL.
2784 * ----------
2785 */
2786 PgStat_StatFuncEntry *
pgstat_fetch_stat_funcentry(Oid func_id)2787 pgstat_fetch_stat_funcentry(Oid func_id)
2788 {
2789 PgStat_StatDBEntry *dbentry;
2790 PgStat_StatFuncEntry *funcentry = NULL;
2791
2792 /* load the stats file if needed */
2793 backend_read_statsfile();
2794
2795 /* Lookup our database, then find the requested function. */
2796 dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
2797 if (dbentry != NULL && dbentry->functions != NULL)
2798 {
2799 funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
2800 (void *) &func_id,
2801 HASH_FIND, NULL);
2802 }
2803
2804 return funcentry;
2805 }
2806
2807
2808 /*
2809 * ---------
2810 * pgstat_fetch_stat_archiver() -
2811 *
2812 * Support function for the SQL-callable pgstat* functions. Returns
2813 * a pointer to the archiver statistics struct.
2814 * ---------
2815 */
2816 PgStat_ArchiverStats *
pgstat_fetch_stat_archiver(void)2817 pgstat_fetch_stat_archiver(void)
2818 {
2819 backend_read_statsfile();
2820
2821 return &archiverStats;
2822 }
2823
2824
2825 /*
2826 * ---------
2827 * pgstat_fetch_global() -
2828 *
2829 * Support function for the SQL-callable pgstat* functions. Returns
2830 * a pointer to the global statistics struct.
2831 * ---------
2832 */
2833 PgStat_GlobalStats *
pgstat_fetch_global(void)2834 pgstat_fetch_global(void)
2835 {
2836 backend_read_statsfile();
2837
2838 return &globalStats;
2839 }
2840
2841 /*
2842 * ---------
2843 * pgstat_fetch_stat_wal() -
2844 *
2845 * Support function for the SQL-callable pgstat* functions. Returns
2846 * a pointer to the WAL statistics struct.
2847 * ---------
2848 */
2849 PgStat_WalStats *
pgstat_fetch_stat_wal(void)2850 pgstat_fetch_stat_wal(void)
2851 {
2852 backend_read_statsfile();
2853
2854 return &walStats;
2855 }
2856
2857 /*
2858 * ---------
2859 * pgstat_fetch_slru() -
2860 *
2861 * Support function for the SQL-callable pgstat* functions. Returns
2862 * a pointer to the slru statistics struct.
2863 * ---------
2864 */
2865 PgStat_SLRUStats *
pgstat_fetch_slru(void)2866 pgstat_fetch_slru(void)
2867 {
2868 backend_read_statsfile();
2869
2870 return slruStats;
2871 }
2872
2873 /*
2874 * ---------
2875 * pgstat_fetch_replslot() -
2876 *
2877 * Support function for the SQL-callable pgstat* functions. Returns
2878 * a pointer to the replication slot statistics struct.
2879 * ---------
2880 */
2881 PgStat_StatReplSlotEntry *
pgstat_fetch_replslot(NameData slotname)2882 pgstat_fetch_replslot(NameData slotname)
2883 {
2884 backend_read_statsfile();
2885
2886 return pgstat_get_replslot_entry(slotname, false);
2887 }
2888
2889 /*
2890 * Shut down a single backend's statistics reporting at process exit.
2891 *
2892 * Flush any remaining statistics counts out to the collector.
2893 * Without this, operations triggered during backend exit (such as
2894 * temp table deletions) won't be counted.
2895 */
2896 static void
pgstat_shutdown_hook(int code,Datum arg)2897 pgstat_shutdown_hook(int code, Datum arg)
2898 {
2899 /*
2900 * If we got as far as discovering our own database ID, we can report what
2901 * we did to the collector. Otherwise, we'd be sending an invalid
2902 * database ID, so forget it. (This means that accesses to pg_database
2903 * during failed backend starts might never get counted.)
2904 */
2905 if (OidIsValid(MyDatabaseId))
2906 pgstat_report_stat(true);
2907 }
2908
2909 /* ----------
2910 * pgstat_initialize() -
2911 *
2912 * Initialize pgstats state, and set up our on-proc-exit hook.
2913 * Called from InitPostgres and AuxiliaryProcessMain.
2914 *
2915 * NOTE: MyDatabaseId isn't set yet; so the shutdown hook has to be careful.
2916 * ----------
2917 */
2918 void
pgstat_initialize(void)2919 pgstat_initialize(void)
2920 {
2921 /*
2922 * Initialize prevWalUsage with pgWalUsage so that pgstat_send_wal() can
2923 * calculate how much pgWalUsage counters are increased by substracting
2924 * prevWalUsage from pgWalUsage.
2925 */
2926 prevWalUsage = pgWalUsage;
2927
2928 /* Set up a process-exit hook to clean up */
2929 on_shmem_exit(pgstat_shutdown_hook, 0);
2930 }
2931
2932 /* ------------------------------------------------------------
2933 * Local support functions follow
2934 * ------------------------------------------------------------
2935 */
2936
2937
2938 /* ----------
2939 * pgstat_setheader() -
2940 *
2941 * Set common header fields in a statistics message
2942 * ----------
2943 */
2944 static void
pgstat_setheader(PgStat_MsgHdr * hdr,StatMsgType mtype)2945 pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype)
2946 {
2947 hdr->m_type = mtype;
2948 }
2949
2950
2951 /* ----------
2952 * pgstat_send() -
2953 *
2954 * Send out one statistics message to the collector
2955 * ----------
2956 */
2957 static void
pgstat_send(void * msg,int len)2958 pgstat_send(void *msg, int len)
2959 {
2960 int rc;
2961
2962 if (pgStatSock == PGINVALID_SOCKET)
2963 return;
2964
2965 ((PgStat_MsgHdr *) msg)->m_size = len;
2966
2967 /* We'll retry after EINTR, but ignore all other failures */
2968 do
2969 {
2970 rc = send(pgStatSock, msg, len, 0);
2971 } while (rc < 0 && errno == EINTR);
2972
2973 #ifdef USE_ASSERT_CHECKING
2974 /* In debug builds, log send failures ... */
2975 if (rc < 0)
2976 elog(LOG, "could not send to statistics collector: %m");
2977 #endif
2978 }
2979
2980 /* ----------
2981 * pgstat_send_archiver() -
2982 *
2983 * Tell the collector about the WAL file that we successfully
2984 * archived or failed to archive.
2985 * ----------
2986 */
2987 void
pgstat_send_archiver(const char * xlog,bool failed)2988 pgstat_send_archiver(const char *xlog, bool failed)
2989 {
2990 PgStat_MsgArchiver msg;
2991
2992 /*
2993 * Prepare and send the message
2994 */
2995 pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ARCHIVER);
2996 msg.m_failed = failed;
2997 strlcpy(msg.m_xlog, xlog, sizeof(msg.m_xlog));
2998 msg.m_timestamp = GetCurrentTimestamp();
2999 pgstat_send(&msg, sizeof(msg));
3000 }
3001
3002 /* ----------
3003 * pgstat_send_bgwriter() -
3004 *
3005 * Send bgwriter statistics to the collector
3006 * ----------
3007 */
3008 void
pgstat_send_bgwriter(void)3009 pgstat_send_bgwriter(void)
3010 {
3011 /* We assume this initializes to zeroes */
3012 static const PgStat_MsgBgWriter all_zeroes;
3013
3014 /*
3015 * This function can be called even if nothing at all has happened. In
3016 * this case, avoid sending a completely empty message to the stats
3017 * collector.
3018 */
3019 if (memcmp(&BgWriterStats, &all_zeroes, sizeof(PgStat_MsgBgWriter)) == 0)
3020 return;
3021
3022 /*
3023 * Prepare and send the message
3024 */
3025 pgstat_setheader(&BgWriterStats.m_hdr, PGSTAT_MTYPE_BGWRITER);
3026 pgstat_send(&BgWriterStats, sizeof(BgWriterStats));
3027
3028 /*
3029 * Clear out the statistics buffer, so it can be re-used.
3030 */
3031 MemSet(&BgWriterStats, 0, sizeof(BgWriterStats));
3032 }
3033
3034 /* ----------
3035 * pgstat_send_wal() -
3036 *
3037 * Send WAL statistics to the collector.
3038 *
3039 * If 'force' is not set, WAL stats message is only sent if enough time has
3040 * passed since last one was sent to reach PGSTAT_STAT_INTERVAL.
3041 * ----------
3042 */
3043 void
pgstat_send_wal(bool force)3044 pgstat_send_wal(bool force)
3045 {
3046 static TimestampTz sendTime = 0;
3047
3048 /*
3049 * This function can be called even if nothing at all has happened. In
3050 * this case, avoid sending a completely empty message to the stats
3051 * collector.
3052 *
3053 * Check wal_records counter to determine whether any WAL activity has
3054 * happened since last time. Note that other WalUsage counters don't need
3055 * to be checked because they are incremented always together with
3056 * wal_records counter.
3057 *
3058 * m_wal_buffers_full also doesn't need to be checked because it's
3059 * incremented only when at least one WAL record is generated (i.e.,
3060 * wal_records counter is incremented). But for safely, we assert that
3061 * m_wal_buffers_full is always zero when no WAL record is generated
3062 *
3063 * This function can be called by a process like walwriter that normally
3064 * generates no WAL records. To determine whether any WAL activity has
3065 * happened at that process since the last time, the numbers of WAL writes
3066 * and syncs are also checked.
3067 */
3068 if (pgWalUsage.wal_records == prevWalUsage.wal_records &&
3069 WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0)
3070 {
3071 Assert(WalStats.m_wal_buffers_full == 0);
3072 return;
3073 }
3074
3075 if (!force)
3076 {
3077 TimestampTz now = GetCurrentTimestamp();
3078
3079 /*
3080 * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL
3081 * msec since we last sent one to avoid overloading the stats
3082 * collector.
3083 */
3084 if (!TimestampDifferenceExceeds(sendTime, now, PGSTAT_STAT_INTERVAL))
3085 return;
3086 sendTime = now;
3087 }
3088
3089 /*
3090 * Set the counters related to generated WAL data if the counters were
3091 * updated.
3092 */
3093 if (pgWalUsage.wal_records != prevWalUsage.wal_records)
3094 {
3095 WalUsage walusage;
3096
3097 /*
3098 * Calculate how much WAL usage counters were increased by
3099 * substracting the previous counters from the current ones. Fill the
3100 * results in WAL stats message.
3101 */
3102 MemSet(&walusage, 0, sizeof(WalUsage));
3103 WalUsageAccumDiff(&walusage, &pgWalUsage, &prevWalUsage);
3104
3105 WalStats.m_wal_records = walusage.wal_records;
3106 WalStats.m_wal_fpi = walusage.wal_fpi;
3107 WalStats.m_wal_bytes = walusage.wal_bytes;
3108
3109 /*
3110 * Save the current counters for the subsequent calculation of WAL
3111 * usage.
3112 */
3113 prevWalUsage = pgWalUsage;
3114 }
3115
3116 /*
3117 * Prepare and send the message
3118 */
3119 pgstat_setheader(&WalStats.m_hdr, PGSTAT_MTYPE_WAL);
3120 pgstat_send(&WalStats, sizeof(WalStats));
3121
3122 /*
3123 * Clear out the statistics buffer, so it can be re-used.
3124 */
3125 MemSet(&WalStats, 0, sizeof(WalStats));
3126 }
3127
3128 /* ----------
3129 * pgstat_send_slru() -
3130 *
3131 * Send SLRU statistics to the collector
3132 * ----------
3133 */
3134 static void
pgstat_send_slru(void)3135 pgstat_send_slru(void)
3136 {
3137 /* We assume this initializes to zeroes */
3138 static const PgStat_MsgSLRU all_zeroes;
3139
3140 for (int i = 0; i < SLRU_NUM_ELEMENTS; i++)
3141 {
3142 /*
3143 * This function can be called even if nothing at all has happened. In
3144 * this case, avoid sending a completely empty message to the stats
3145 * collector.
3146 */
3147 if (memcmp(&SLRUStats[i], &all_zeroes, sizeof(PgStat_MsgSLRU)) == 0)
3148 continue;
3149
3150 /* set the SLRU type before each send */
3151 SLRUStats[i].m_index = i;
3152
3153 /*
3154 * Prepare and send the message
3155 */
3156 pgstat_setheader(&SLRUStats[i].m_hdr, PGSTAT_MTYPE_SLRU);
3157 pgstat_send(&SLRUStats[i], sizeof(PgStat_MsgSLRU));
3158
3159 /*
3160 * Clear out the statistics buffer, so it can be re-used.
3161 */
3162 MemSet(&SLRUStats[i], 0, sizeof(PgStat_MsgSLRU));
3163 }
3164 }
3165
3166
3167 /* ----------
3168 * PgstatCollectorMain() -
3169 *
3170 * Start up the statistics collector process. This is the body of the
3171 * postmaster child process.
3172 *
3173 * The argc/argv parameters are valid only in EXEC_BACKEND case.
3174 * ----------
3175 */
3176 NON_EXEC_STATIC void
PgstatCollectorMain(int argc,char * argv[])3177 PgstatCollectorMain(int argc, char *argv[])
3178 {
3179 int len;
3180 PgStat_Msg msg;
3181 int wr;
3182 WaitEvent event;
3183 WaitEventSet *wes;
3184
3185 /*
3186 * Ignore all signals usually bound to some action in the postmaster,
3187 * except SIGHUP and SIGQUIT. Note we don't need a SIGUSR1 handler to
3188 * support latch operations, because we only use a local latch.
3189 */
3190 pqsignal(SIGHUP, SignalHandlerForConfigReload);
3191 pqsignal(SIGINT, SIG_IGN);
3192 pqsignal(SIGTERM, SIG_IGN);
3193 pqsignal(SIGQUIT, SignalHandlerForShutdownRequest);
3194 pqsignal(SIGALRM, SIG_IGN);
3195 pqsignal(SIGPIPE, SIG_IGN);
3196 pqsignal(SIGUSR1, SIG_IGN);
3197 pqsignal(SIGUSR2, SIG_IGN);
3198 /* Reset some signals that are accepted by postmaster but not here */
3199 pqsignal(SIGCHLD, SIG_DFL);
3200 PG_SETMASK(&UnBlockSig);
3201
3202 MyBackendType = B_STATS_COLLECTOR;
3203 init_ps_display(NULL);
3204
3205 /*
3206 * Read in existing stats files or initialize the stats to zero.
3207 */
3208 pgStatRunningInCollector = true;
3209 pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
3210
3211 /* Prepare to wait for our latch or data in our socket. */
3212 wes = CreateWaitEventSet(CurrentMemoryContext, 3);
3213 AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
3214 AddWaitEventToSet(wes, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
3215 AddWaitEventToSet(wes, WL_SOCKET_READABLE, pgStatSock, NULL, NULL);
3216
3217 /*
3218 * Loop to process messages until we get SIGQUIT or detect ungraceful
3219 * death of our parent postmaster.
3220 *
3221 * For performance reasons, we don't want to do ResetLatch/WaitLatch after
3222 * every message; instead, do that only after a recv() fails to obtain a
3223 * message. (This effectively means that if backends are sending us stuff
3224 * like mad, we won't notice postmaster death until things slack off a
3225 * bit; which seems fine.) To do that, we have an inner loop that
3226 * iterates as long as recv() succeeds. We do check ConfigReloadPending
3227 * inside the inner loop, which means that such interrupts will get
3228 * serviced but the latch won't get cleared until next time there is a
3229 * break in the action.
3230 */
3231 for (;;)
3232 {
3233 /* Clear any already-pending wakeups */
3234 ResetLatch(MyLatch);
3235
3236 /*
3237 * Quit if we get SIGQUIT from the postmaster.
3238 */
3239 if (ShutdownRequestPending)
3240 break;
3241
3242 /*
3243 * Inner loop iterates as long as we keep getting messages, or until
3244 * ShutdownRequestPending becomes set.
3245 */
3246 while (!ShutdownRequestPending)
3247 {
3248 /*
3249 * Reload configuration if we got SIGHUP from the postmaster.
3250 */
3251 if (ConfigReloadPending)
3252 {
3253 ConfigReloadPending = false;
3254 ProcessConfigFile(PGC_SIGHUP);
3255 }
3256
3257 /*
3258 * Write the stats file(s) if a new request has arrived that is
3259 * not satisfied by existing file(s).
3260 */
3261 if (pgstat_write_statsfile_needed())
3262 pgstat_write_statsfiles(false, false);
3263
3264 /*
3265 * Try to receive and process a message. This will not block,
3266 * since the socket is set to non-blocking mode.
3267 *
3268 * XXX On Windows, we have to force pgwin32_recv to cooperate,
3269 * despite the previous use of pg_set_noblock() on the socket.
3270 * This is extremely broken and should be fixed someday.
3271 */
3272 #ifdef WIN32
3273 pgwin32_noblock = 1;
3274 #endif
3275
3276 len = recv(pgStatSock, (char *) &msg,
3277 sizeof(PgStat_Msg), 0);
3278
3279 #ifdef WIN32
3280 pgwin32_noblock = 0;
3281 #endif
3282
3283 if (len < 0)
3284 {
3285 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
3286 break; /* out of inner loop */
3287 ereport(ERROR,
3288 (errcode_for_socket_access(),
3289 errmsg("could not read statistics message: %m")));
3290 }
3291
3292 /*
3293 * We ignore messages that are smaller than our common header
3294 */
3295 if (len < sizeof(PgStat_MsgHdr))
3296 continue;
3297
3298 /*
3299 * The received length must match the length in the header
3300 */
3301 if (msg.msg_hdr.m_size != len)
3302 continue;
3303
3304 /*
3305 * O.K. - we accept this message. Process it.
3306 */
3307 switch (msg.msg_hdr.m_type)
3308 {
3309 case PGSTAT_MTYPE_DUMMY:
3310 break;
3311
3312 case PGSTAT_MTYPE_INQUIRY:
3313 pgstat_recv_inquiry(&msg.msg_inquiry, len);
3314 break;
3315
3316 case PGSTAT_MTYPE_TABSTAT:
3317 pgstat_recv_tabstat(&msg.msg_tabstat, len);
3318 break;
3319
3320 case PGSTAT_MTYPE_TABPURGE:
3321 pgstat_recv_tabpurge(&msg.msg_tabpurge, len);
3322 break;
3323
3324 case PGSTAT_MTYPE_DROPDB:
3325 pgstat_recv_dropdb(&msg.msg_dropdb, len);
3326 break;
3327
3328 case PGSTAT_MTYPE_RESETCOUNTER:
3329 pgstat_recv_resetcounter(&msg.msg_resetcounter, len);
3330 break;
3331
3332 case PGSTAT_MTYPE_RESETSHAREDCOUNTER:
3333 pgstat_recv_resetsharedcounter(&msg.msg_resetsharedcounter,
3334 len);
3335 break;
3336
3337 case PGSTAT_MTYPE_RESETSINGLECOUNTER:
3338 pgstat_recv_resetsinglecounter(&msg.msg_resetsinglecounter,
3339 len);
3340 break;
3341
3342 case PGSTAT_MTYPE_RESETSLRUCOUNTER:
3343 pgstat_recv_resetslrucounter(&msg.msg_resetslrucounter,
3344 len);
3345 break;
3346
3347 case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
3348 pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
3349 len);
3350 break;
3351
3352 case PGSTAT_MTYPE_AUTOVAC_START:
3353 pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
3354 break;
3355
3356 case PGSTAT_MTYPE_VACUUM:
3357 pgstat_recv_vacuum(&msg.msg_vacuum, len);
3358 break;
3359
3360 case PGSTAT_MTYPE_ANALYZE:
3361 pgstat_recv_analyze(&msg.msg_analyze, len);
3362 break;
3363
3364 case PGSTAT_MTYPE_ARCHIVER:
3365 pgstat_recv_archiver(&msg.msg_archiver, len);
3366 break;
3367
3368 case PGSTAT_MTYPE_BGWRITER:
3369 pgstat_recv_bgwriter(&msg.msg_bgwriter, len);
3370 break;
3371
3372 case PGSTAT_MTYPE_WAL:
3373 pgstat_recv_wal(&msg.msg_wal, len);
3374 break;
3375
3376 case PGSTAT_MTYPE_SLRU:
3377 pgstat_recv_slru(&msg.msg_slru, len);
3378 break;
3379
3380 case PGSTAT_MTYPE_FUNCSTAT:
3381 pgstat_recv_funcstat(&msg.msg_funcstat, len);
3382 break;
3383
3384 case PGSTAT_MTYPE_FUNCPURGE:
3385 pgstat_recv_funcpurge(&msg.msg_funcpurge, len);
3386 break;
3387
3388 case PGSTAT_MTYPE_RECOVERYCONFLICT:
3389 pgstat_recv_recoveryconflict(&msg.msg_recoveryconflict,
3390 len);
3391 break;
3392
3393 case PGSTAT_MTYPE_DEADLOCK:
3394 pgstat_recv_deadlock(&msg.msg_deadlock, len);
3395 break;
3396
3397 case PGSTAT_MTYPE_TEMPFILE:
3398 pgstat_recv_tempfile(&msg.msg_tempfile, len);
3399 break;
3400
3401 case PGSTAT_MTYPE_CHECKSUMFAILURE:
3402 pgstat_recv_checksum_failure(&msg.msg_checksumfailure,
3403 len);
3404 break;
3405
3406 case PGSTAT_MTYPE_REPLSLOT:
3407 pgstat_recv_replslot(&msg.msg_replslot, len);
3408 break;
3409
3410 case PGSTAT_MTYPE_CONNECT:
3411 pgstat_recv_connect(&msg.msg_connect, len);
3412 break;
3413
3414 case PGSTAT_MTYPE_DISCONNECT:
3415 pgstat_recv_disconnect(&msg.msg_disconnect, len);
3416 break;
3417
3418 default:
3419 break;
3420 }
3421 } /* end of inner message-processing loop */
3422
3423 /* Sleep until there's something to do */
3424 #ifndef WIN32
3425 wr = WaitEventSetWait(wes, -1L, &event, 1, WAIT_EVENT_PGSTAT_MAIN);
3426 #else
3427
3428 /*
3429 * Windows, at least in its Windows Server 2003 R2 incarnation,
3430 * sometimes loses FD_READ events. Waking up and retrying the recv()
3431 * fixes that, so don't sleep indefinitely. This is a crock of the
3432 * first water, but until somebody wants to debug exactly what's
3433 * happening there, this is the best we can do. The two-second
3434 * timeout matches our pre-9.2 behavior, and needs to be short enough
3435 * to not provoke "using stale statistics" complaints from
3436 * backend_read_statsfile.
3437 */
3438 wr = WaitEventSetWait(wes, 2 * 1000L /* msec */ , &event, 1,
3439 WAIT_EVENT_PGSTAT_MAIN);
3440 #endif
3441
3442 /*
3443 * Emergency bailout if postmaster has died. This is to avoid the
3444 * necessity for manual cleanup of all postmaster children.
3445 */
3446 if (wr == 1 && event.events == WL_POSTMASTER_DEATH)
3447 break;
3448 } /* end of outer loop */
3449
3450 /*
3451 * Save the final stats to reuse at next startup.
3452 */
3453 pgstat_write_statsfiles(true, true);
3454
3455 FreeWaitEventSet(wes);
3456
3457 exit(0);
3458 }
3459
3460 /*
3461 * Subroutine to clear stats in a database entry
3462 *
3463 * Tables and functions hashes are initialized to empty.
3464 */
3465 static void
reset_dbentry_counters(PgStat_StatDBEntry * dbentry)3466 reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
3467 {
3468 HASHCTL hash_ctl;
3469
3470 dbentry->n_xact_commit = 0;
3471 dbentry->n_xact_rollback = 0;
3472 dbentry->n_blocks_fetched = 0;
3473 dbentry->n_blocks_hit = 0;
3474 dbentry->n_tuples_returned = 0;
3475 dbentry->n_tuples_fetched = 0;
3476 dbentry->n_tuples_inserted = 0;
3477 dbentry->n_tuples_updated = 0;
3478 dbentry->n_tuples_deleted = 0;
3479 dbentry->last_autovac_time = 0;
3480 dbentry->n_conflict_tablespace = 0;
3481 dbentry->n_conflict_lock = 0;
3482 dbentry->n_conflict_snapshot = 0;
3483 dbentry->n_conflict_bufferpin = 0;
3484 dbentry->n_conflict_startup_deadlock = 0;
3485 dbentry->n_temp_files = 0;
3486 dbentry->n_temp_bytes = 0;
3487 dbentry->n_deadlocks = 0;
3488 dbentry->n_checksum_failures = 0;
3489 dbentry->last_checksum_failure = 0;
3490 dbentry->n_block_read_time = 0;
3491 dbentry->n_block_write_time = 0;
3492 dbentry->n_sessions = 0;
3493 dbentry->total_session_time = 0;
3494 dbentry->total_active_time = 0;
3495 dbentry->total_idle_in_xact_time = 0;
3496 dbentry->n_sessions_abandoned = 0;
3497 dbentry->n_sessions_fatal = 0;
3498 dbentry->n_sessions_killed = 0;
3499
3500 dbentry->stat_reset_timestamp = GetCurrentTimestamp();
3501 dbentry->stats_timestamp = 0;
3502
3503 hash_ctl.keysize = sizeof(Oid);
3504 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
3505 dbentry->tables = hash_create("Per-database table",
3506 PGSTAT_TAB_HASH_SIZE,
3507 &hash_ctl,
3508 HASH_ELEM | HASH_BLOBS);
3509
3510 hash_ctl.keysize = sizeof(Oid);
3511 hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
3512 dbentry->functions = hash_create("Per-database function",
3513 PGSTAT_FUNCTION_HASH_SIZE,
3514 &hash_ctl,
3515 HASH_ELEM | HASH_BLOBS);
3516 }
3517
3518 /*
3519 * Lookup the hash table entry for the specified database. If no hash
3520 * table entry exists, initialize it, if the create parameter is true.
3521 * Else, return NULL.
3522 */
3523 static PgStat_StatDBEntry *
pgstat_get_db_entry(Oid databaseid,bool create)3524 pgstat_get_db_entry(Oid databaseid, bool create)
3525 {
3526 PgStat_StatDBEntry *result;
3527 bool found;
3528 HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3529
3530 /* Lookup or create the hash table entry for this database */
3531 result = (PgStat_StatDBEntry *) hash_search(pgStatDBHash,
3532 &databaseid,
3533 action, &found);
3534
3535 if (!create && !found)
3536 return NULL;
3537
3538 /*
3539 * If not found, initialize the new one. This creates empty hash tables
3540 * for tables and functions, too.
3541 */
3542 if (!found)
3543 reset_dbentry_counters(result);
3544
3545 return result;
3546 }
3547
3548
3549 /*
3550 * Lookup the hash table entry for the specified table. If no hash
3551 * table entry exists, initialize it, if the create parameter is true.
3552 * Else, return NULL.
3553 */
3554 static PgStat_StatTabEntry *
pgstat_get_tab_entry(PgStat_StatDBEntry * dbentry,Oid tableoid,bool create)3555 pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
3556 {
3557 PgStat_StatTabEntry *result;
3558 bool found;
3559 HASHACTION action = (create ? HASH_ENTER : HASH_FIND);
3560
3561 /* Lookup or create the hash table entry for this table */
3562 result = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
3563 &tableoid,
3564 action, &found);
3565
3566 if (!create && !found)
3567 return NULL;
3568
3569 /* If not found, initialize the new one. */
3570 if (!found)
3571 {
3572 result->numscans = 0;
3573 result->tuples_returned = 0;
3574 result->tuples_fetched = 0;
3575 result->tuples_inserted = 0;
3576 result->tuples_updated = 0;
3577 result->tuples_deleted = 0;
3578 result->tuples_hot_updated = 0;
3579 result->n_live_tuples = 0;
3580 result->n_dead_tuples = 0;
3581 result->changes_since_analyze = 0;
3582 result->inserts_since_vacuum = 0;
3583 result->blocks_fetched = 0;
3584 result->blocks_hit = 0;
3585 result->vacuum_timestamp = 0;
3586 result->vacuum_count = 0;
3587 result->autovac_vacuum_timestamp = 0;
3588 result->autovac_vacuum_count = 0;
3589 result->analyze_timestamp = 0;
3590 result->analyze_count = 0;
3591 result->autovac_analyze_timestamp = 0;
3592 result->autovac_analyze_count = 0;
3593 }
3594
3595 return result;
3596 }
3597
3598
3599 /* ----------
3600 * pgstat_write_statsfiles() -
3601 * Write the global statistics file, as well as requested DB files.
3602 *
3603 * 'permanent' specifies writing to the permanent files not temporary ones.
3604 * When true (happens only when the collector is shutting down), also remove
3605 * the temporary files so that backends starting up under a new postmaster
3606 * can't read old data before the new collector is ready.
3607 *
3608 * When 'allDbs' is false, only the requested databases (listed in
3609 * pending_write_requests) will be written; otherwise, all databases
3610 * will be written.
3611 * ----------
3612 */
3613 static void
pgstat_write_statsfiles(bool permanent,bool allDbs)3614 pgstat_write_statsfiles(bool permanent, bool allDbs)
3615 {
3616 HASH_SEQ_STATUS hstat;
3617 PgStat_StatDBEntry *dbentry;
3618 FILE *fpout;
3619 int32 format_id;
3620 const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
3621 const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3622 int rc;
3623
3624 elog(DEBUG2, "writing stats file \"%s\"", statfile);
3625
3626 /*
3627 * Open the statistics temp file to write out the current values.
3628 */
3629 fpout = AllocateFile(tmpfile, PG_BINARY_W);
3630 if (fpout == NULL)
3631 {
3632 ereport(LOG,
3633 (errcode_for_file_access(),
3634 errmsg("could not open temporary statistics file \"%s\": %m",
3635 tmpfile)));
3636 return;
3637 }
3638
3639 /*
3640 * Set the timestamp of the stats file.
3641 */
3642 globalStats.stats_timestamp = GetCurrentTimestamp();
3643
3644 /*
3645 * Write the file header --- currently just a format ID.
3646 */
3647 format_id = PGSTAT_FILE_FORMAT_ID;
3648 rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3649 (void) rc; /* we'll check for error with ferror */
3650
3651 /*
3652 * Write global stats struct
3653 */
3654 rc = fwrite(&globalStats, sizeof(globalStats), 1, fpout);
3655 (void) rc; /* we'll check for error with ferror */
3656
3657 /*
3658 * Write archiver stats struct
3659 */
3660 rc = fwrite(&archiverStats, sizeof(archiverStats), 1, fpout);
3661 (void) rc; /* we'll check for error with ferror */
3662
3663 /*
3664 * Write WAL stats struct
3665 */
3666 rc = fwrite(&walStats, sizeof(walStats), 1, fpout);
3667 (void) rc; /* we'll check for error with ferror */
3668
3669 /*
3670 * Write SLRU stats struct
3671 */
3672 rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
3673 (void) rc; /* we'll check for error with ferror */
3674
3675 /*
3676 * Walk through the database table.
3677 */
3678 hash_seq_init(&hstat, pgStatDBHash);
3679 while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
3680 {
3681 /*
3682 * Write out the table and function stats for this DB into the
3683 * appropriate per-DB stat file, if required.
3684 */
3685 if (allDbs || pgstat_db_requested(dbentry->databaseid))
3686 {
3687 /* Make DB's timestamp consistent with the global stats */
3688 dbentry->stats_timestamp = globalStats.stats_timestamp;
3689
3690 pgstat_write_db_statsfile(dbentry, permanent);
3691 }
3692
3693 /*
3694 * Write out the DB entry. We don't write the tables or functions
3695 * pointers, since they're of no use to any other process.
3696 */
3697 fputc('D', fpout);
3698 rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
3699 (void) rc; /* we'll check for error with ferror */
3700 }
3701
3702 /*
3703 * Write replication slot stats struct
3704 */
3705 if (replSlotStatHash)
3706 {
3707 PgStat_StatReplSlotEntry *slotent;
3708
3709 hash_seq_init(&hstat, replSlotStatHash);
3710 while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
3711 {
3712 fputc('R', fpout);
3713 rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
3714 (void) rc; /* we'll check for error with ferror */
3715 }
3716 }
3717
3718 /*
3719 * No more output to be done. Close the temp file and replace the old
3720 * pgstat.stat with it. The ferror() check replaces testing for error
3721 * after each individual fputc or fwrite above.
3722 */
3723 fputc('E', fpout);
3724
3725 if (ferror(fpout))
3726 {
3727 ereport(LOG,
3728 (errcode_for_file_access(),
3729 errmsg("could not write temporary statistics file \"%s\": %m",
3730 tmpfile)));
3731 FreeFile(fpout);
3732 unlink(tmpfile);
3733 }
3734 else if (FreeFile(fpout) < 0)
3735 {
3736 ereport(LOG,
3737 (errcode_for_file_access(),
3738 errmsg("could not close temporary statistics file \"%s\": %m",
3739 tmpfile)));
3740 unlink(tmpfile);
3741 }
3742 else if (rename(tmpfile, statfile) < 0)
3743 {
3744 ereport(LOG,
3745 (errcode_for_file_access(),
3746 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3747 tmpfile, statfile)));
3748 unlink(tmpfile);
3749 }
3750
3751 if (permanent)
3752 unlink(pgstat_stat_filename);
3753
3754 /*
3755 * Now throw away the list of requests. Note that requests sent after we
3756 * started the write are still waiting on the network socket.
3757 */
3758 list_free(pending_write_requests);
3759 pending_write_requests = NIL;
3760 }
3761
3762 /*
3763 * return the filename for a DB stat file; filename is the output buffer,
3764 * of length len.
3765 */
3766 static void
get_dbstat_filename(bool permanent,bool tempname,Oid databaseid,char * filename,int len)3767 get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
3768 char *filename, int len)
3769 {
3770 int printed;
3771
3772 /* NB -- pgstat_reset_remove_files knows about the pattern this uses */
3773 printed = snprintf(filename, len, "%s/db_%u.%s",
3774 permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
3775 pgstat_stat_directory,
3776 databaseid,
3777 tempname ? "tmp" : "stat");
3778 if (printed >= len)
3779 elog(ERROR, "overlength pgstat path");
3780 }
3781
3782 /* ----------
3783 * pgstat_write_db_statsfile() -
3784 * Write the stat file for a single database.
3785 *
3786 * If writing to the permanent file (happens when the collector is
3787 * shutting down only), remove the temporary file so that backends
3788 * starting up under a new postmaster can't read the old data before
3789 * the new collector is ready.
3790 * ----------
3791 */
3792 static void
pgstat_write_db_statsfile(PgStat_StatDBEntry * dbentry,bool permanent)3793 pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
3794 {
3795 HASH_SEQ_STATUS tstat;
3796 HASH_SEQ_STATUS fstat;
3797 PgStat_StatTabEntry *tabentry;
3798 PgStat_StatFuncEntry *funcentry;
3799 FILE *fpout;
3800 int32 format_id;
3801 Oid dbid = dbentry->databaseid;
3802 int rc;
3803 char tmpfile[MAXPGPATH];
3804 char statfile[MAXPGPATH];
3805
3806 get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
3807 get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
3808
3809 elog(DEBUG2, "writing stats file \"%s\"", statfile);
3810
3811 /*
3812 * Open the statistics temp file to write out the current values.
3813 */
3814 fpout = AllocateFile(tmpfile, PG_BINARY_W);
3815 if (fpout == NULL)
3816 {
3817 ereport(LOG,
3818 (errcode_for_file_access(),
3819 errmsg("could not open temporary statistics file \"%s\": %m",
3820 tmpfile)));
3821 return;
3822 }
3823
3824 /*
3825 * Write the file header --- currently just a format ID.
3826 */
3827 format_id = PGSTAT_FILE_FORMAT_ID;
3828 rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
3829 (void) rc; /* we'll check for error with ferror */
3830
3831 /*
3832 * Walk through the database's access stats per table.
3833 */
3834 hash_seq_init(&tstat, dbentry->tables);
3835 while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&tstat)) != NULL)
3836 {
3837 fputc('T', fpout);
3838 rc = fwrite(tabentry, sizeof(PgStat_StatTabEntry), 1, fpout);
3839 (void) rc; /* we'll check for error with ferror */
3840 }
3841
3842 /*
3843 * Walk through the database's function stats table.
3844 */
3845 hash_seq_init(&fstat, dbentry->functions);
3846 while ((funcentry = (PgStat_StatFuncEntry *) hash_seq_search(&fstat)) != NULL)
3847 {
3848 fputc('F', fpout);
3849 rc = fwrite(funcentry, sizeof(PgStat_StatFuncEntry), 1, fpout);
3850 (void) rc; /* we'll check for error with ferror */
3851 }
3852
3853 /*
3854 * No more output to be done. Close the temp file and replace the old
3855 * pgstat.stat with it. The ferror() check replaces testing for error
3856 * after each individual fputc or fwrite above.
3857 */
3858 fputc('E', fpout);
3859
3860 if (ferror(fpout))
3861 {
3862 ereport(LOG,
3863 (errcode_for_file_access(),
3864 errmsg("could not write temporary statistics file \"%s\": %m",
3865 tmpfile)));
3866 FreeFile(fpout);
3867 unlink(tmpfile);
3868 }
3869 else if (FreeFile(fpout) < 0)
3870 {
3871 ereport(LOG,
3872 (errcode_for_file_access(),
3873 errmsg("could not close temporary statistics file \"%s\": %m",
3874 tmpfile)));
3875 unlink(tmpfile);
3876 }
3877 else if (rename(tmpfile, statfile) < 0)
3878 {
3879 ereport(LOG,
3880 (errcode_for_file_access(),
3881 errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
3882 tmpfile, statfile)));
3883 unlink(tmpfile);
3884 }
3885
3886 if (permanent)
3887 {
3888 get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
3889
3890 elog(DEBUG2, "removing temporary stats file \"%s\"", statfile);
3891 unlink(statfile);
3892 }
3893 }
3894
3895 /* ----------
3896 * pgstat_read_statsfiles() -
3897 *
3898 * Reads in some existing statistics collector files and returns the
3899 * databases hash table that is the top level of the data.
3900 *
3901 * If 'onlydb' is not InvalidOid, it means we only want data for that DB
3902 * plus the shared catalogs ("DB 0"). We'll still populate the DB hash
3903 * table for all databases, but we don't bother even creating table/function
3904 * hash tables for other databases.
3905 *
3906 * 'permanent' specifies reading from the permanent files not temporary ones.
3907 * When true (happens only when the collector is starting up), remove the
3908 * files after reading; the in-memory status is now authoritative, and the
3909 * files would be out of date in case somebody else reads them.
3910 *
3911 * If a 'deep' read is requested, table/function stats are read, otherwise
3912 * the table/function hash tables remain empty.
3913 * ----------
3914 */
3915 static HTAB *
pgstat_read_statsfiles(Oid onlydb,bool permanent,bool deep)3916 pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
3917 {
3918 PgStat_StatDBEntry *dbentry;
3919 PgStat_StatDBEntry dbbuf;
3920 HASHCTL hash_ctl;
3921 HTAB *dbhash;
3922 FILE *fpin;
3923 int32 format_id;
3924 bool found;
3925 const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
3926 int i;
3927
3928 /*
3929 * The tables will live in pgStatLocalContext.
3930 */
3931 pgstat_setup_memcxt();
3932
3933 /*
3934 * Create the DB hashtable
3935 */
3936 hash_ctl.keysize = sizeof(Oid);
3937 hash_ctl.entrysize = sizeof(PgStat_StatDBEntry);
3938 hash_ctl.hcxt = pgStatLocalContext;
3939 dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
3940 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
3941
3942 /*
3943 * Clear out global, archiver, WAL and SLRU statistics so they start from
3944 * zero in case we can't load an existing statsfile.
3945 */
3946 memset(&globalStats, 0, sizeof(globalStats));
3947 memset(&archiverStats, 0, sizeof(archiverStats));
3948 memset(&walStats, 0, sizeof(walStats));
3949 memset(&slruStats, 0, sizeof(slruStats));
3950
3951 /*
3952 * Set the current timestamp (will be kept only in case we can't load an
3953 * existing statsfile).
3954 */
3955 globalStats.stat_reset_timestamp = GetCurrentTimestamp();
3956 archiverStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
3957 walStats.stat_reset_timestamp = globalStats.stat_reset_timestamp;
3958
3959 /*
3960 * Set the same reset timestamp for all SLRU items too.
3961 */
3962 for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
3963 slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
3964
3965 /*
3966 * Try to open the stats file. If it doesn't exist, the backends simply
3967 * return zero for anything and the collector simply starts from scratch
3968 * with empty counters.
3969 *
3970 * ENOENT is a possibility if the stats collector is not running or has
3971 * not yet written the stats file the first time. Any other failure
3972 * condition is suspicious.
3973 */
3974 if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
3975 {
3976 if (errno != ENOENT)
3977 ereport(pgStatRunningInCollector ? LOG : WARNING,
3978 (errcode_for_file_access(),
3979 errmsg("could not open statistics file \"%s\": %m",
3980 statfile)));
3981 return dbhash;
3982 }
3983
3984 /*
3985 * Verify it's of the expected format.
3986 */
3987 if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
3988 format_id != PGSTAT_FILE_FORMAT_ID)
3989 {
3990 ereport(pgStatRunningInCollector ? LOG : WARNING,
3991 (errmsg("corrupted statistics file \"%s\"", statfile)));
3992 goto done;
3993 }
3994
3995 /*
3996 * Read global stats struct
3997 */
3998 if (fread(&globalStats, 1, sizeof(globalStats), fpin) != sizeof(globalStats))
3999 {
4000 ereport(pgStatRunningInCollector ? LOG : WARNING,
4001 (errmsg("corrupted statistics file \"%s\"", statfile)));
4002 memset(&globalStats, 0, sizeof(globalStats));
4003 goto done;
4004 }
4005
4006 /*
4007 * In the collector, disregard the timestamp we read from the permanent
4008 * stats file; we should be willing to write a temp stats file immediately
4009 * upon the first request from any backend. This only matters if the old
4010 * file's timestamp is less than PGSTAT_STAT_INTERVAL ago, but that's not
4011 * an unusual scenario.
4012 */
4013 if (pgStatRunningInCollector)
4014 globalStats.stats_timestamp = 0;
4015
4016 /*
4017 * Read archiver stats struct
4018 */
4019 if (fread(&archiverStats, 1, sizeof(archiverStats), fpin) != sizeof(archiverStats))
4020 {
4021 ereport(pgStatRunningInCollector ? LOG : WARNING,
4022 (errmsg("corrupted statistics file \"%s\"", statfile)));
4023 memset(&archiverStats, 0, sizeof(archiverStats));
4024 goto done;
4025 }
4026
4027 /*
4028 * Read WAL stats struct
4029 */
4030 if (fread(&walStats, 1, sizeof(walStats), fpin) != sizeof(walStats))
4031 {
4032 ereport(pgStatRunningInCollector ? LOG : WARNING,
4033 (errmsg("corrupted statistics file \"%s\"", statfile)));
4034 memset(&walStats, 0, sizeof(walStats));
4035 goto done;
4036 }
4037
4038 /*
4039 * Read SLRU stats struct
4040 */
4041 if (fread(slruStats, 1, sizeof(slruStats), fpin) != sizeof(slruStats))
4042 {
4043 ereport(pgStatRunningInCollector ? LOG : WARNING,
4044 (errmsg("corrupted statistics file \"%s\"", statfile)));
4045 memset(&slruStats, 0, sizeof(slruStats));
4046 goto done;
4047 }
4048
4049 /*
4050 * We found an existing collector stats file. Read it and put all the
4051 * hashtable entries into place.
4052 */
4053 for (;;)
4054 {
4055 switch (fgetc(fpin))
4056 {
4057 /*
4058 * 'D' A PgStat_StatDBEntry struct describing a database
4059 * follows.
4060 */
4061 case 'D':
4062 if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
4063 fpin) != offsetof(PgStat_StatDBEntry, tables))
4064 {
4065 ereport(pgStatRunningInCollector ? LOG : WARNING,
4066 (errmsg("corrupted statistics file \"%s\"",
4067 statfile)));
4068 goto done;
4069 }
4070
4071 /*
4072 * Add to the DB hash
4073 */
4074 dbentry = (PgStat_StatDBEntry *) hash_search(dbhash,
4075 (void *) &dbbuf.databaseid,
4076 HASH_ENTER,
4077 &found);
4078 if (found)
4079 {
4080 ereport(pgStatRunningInCollector ? LOG : WARNING,
4081 (errmsg("corrupted statistics file \"%s\"",
4082 statfile)));
4083 goto done;
4084 }
4085
4086 memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry));
4087 dbentry->tables = NULL;
4088 dbentry->functions = NULL;
4089
4090 /*
4091 * In the collector, disregard the timestamp we read from the
4092 * permanent stats file; we should be willing to write a temp
4093 * stats file immediately upon the first request from any
4094 * backend.
4095 */
4096 if (pgStatRunningInCollector)
4097 dbentry->stats_timestamp = 0;
4098
4099 /*
4100 * Don't create tables/functions hashtables for uninteresting
4101 * databases.
4102 */
4103 if (onlydb != InvalidOid)
4104 {
4105 if (dbbuf.databaseid != onlydb &&
4106 dbbuf.databaseid != InvalidOid)
4107 break;
4108 }
4109
4110 hash_ctl.keysize = sizeof(Oid);
4111 hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
4112 hash_ctl.hcxt = pgStatLocalContext;
4113 dbentry->tables = hash_create("Per-database table",
4114 PGSTAT_TAB_HASH_SIZE,
4115 &hash_ctl,
4116 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4117
4118 hash_ctl.keysize = sizeof(Oid);
4119 hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
4120 hash_ctl.hcxt = pgStatLocalContext;
4121 dbentry->functions = hash_create("Per-database function",
4122 PGSTAT_FUNCTION_HASH_SIZE,
4123 &hash_ctl,
4124 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4125
4126 /*
4127 * If requested, read the data from the database-specific
4128 * file. Otherwise we just leave the hashtables empty.
4129 */
4130 if (deep)
4131 pgstat_read_db_statsfile(dbentry->databaseid,
4132 dbentry->tables,
4133 dbentry->functions,
4134 permanent);
4135
4136 break;
4137
4138 /*
4139 * 'R' A PgStat_StatReplSlotEntry struct describing a
4140 * replication slot follows.
4141 */
4142 case 'R':
4143 {
4144 PgStat_StatReplSlotEntry slotbuf;
4145 PgStat_StatReplSlotEntry *slotent;
4146
4147 if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4148 != sizeof(PgStat_StatReplSlotEntry))
4149 {
4150 ereport(pgStatRunningInCollector ? LOG : WARNING,
4151 (errmsg("corrupted statistics file \"%s\"",
4152 statfile)));
4153 goto done;
4154 }
4155
4156 /* Create hash table if we don't have it already. */
4157 if (replSlotStatHash == NULL)
4158 {
4159 HASHCTL hash_ctl;
4160
4161 hash_ctl.keysize = sizeof(NameData);
4162 hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
4163 hash_ctl.hcxt = pgStatLocalContext;
4164 replSlotStatHash = hash_create("Replication slots hash",
4165 PGSTAT_REPLSLOT_HASH_SIZE,
4166 &hash_ctl,
4167 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4168 }
4169
4170 slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
4171 (void *) &slotbuf.slotname,
4172 HASH_ENTER, NULL);
4173 memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
4174 break;
4175 }
4176
4177 case 'E':
4178 goto done;
4179
4180 default:
4181 ereport(pgStatRunningInCollector ? LOG : WARNING,
4182 (errmsg("corrupted statistics file \"%s\"",
4183 statfile)));
4184 goto done;
4185 }
4186 }
4187
4188 done:
4189 FreeFile(fpin);
4190
4191 /* If requested to read the permanent file, also get rid of it. */
4192 if (permanent)
4193 {
4194 elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4195 unlink(statfile);
4196 }
4197
4198 return dbhash;
4199 }
4200
4201
4202 /* ----------
4203 * pgstat_read_db_statsfile() -
4204 *
4205 * Reads in the existing statistics collector file for the given database,
4206 * filling the passed-in tables and functions hash tables.
4207 *
4208 * As in pgstat_read_statsfiles, if the permanent file is requested, it is
4209 * removed after reading.
4210 *
4211 * Note: this code has the ability to skip storing per-table or per-function
4212 * data, if NULL is passed for the corresponding hashtable. That's not used
4213 * at the moment though.
4214 * ----------
4215 */
4216 static void
pgstat_read_db_statsfile(Oid databaseid,HTAB * tabhash,HTAB * funchash,bool permanent)4217 pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
4218 bool permanent)
4219 {
4220 PgStat_StatTabEntry *tabentry;
4221 PgStat_StatTabEntry tabbuf;
4222 PgStat_StatFuncEntry funcbuf;
4223 PgStat_StatFuncEntry *funcentry;
4224 FILE *fpin;
4225 int32 format_id;
4226 bool found;
4227 char statfile[MAXPGPATH];
4228
4229 get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
4230
4231 /*
4232 * Try to open the stats file. If it doesn't exist, the backends simply
4233 * return zero for anything and the collector simply starts from scratch
4234 * with empty counters.
4235 *
4236 * ENOENT is a possibility if the stats collector is not running or has
4237 * not yet written the stats file the first time. Any other failure
4238 * condition is suspicious.
4239 */
4240 if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4241 {
4242 if (errno != ENOENT)
4243 ereport(pgStatRunningInCollector ? LOG : WARNING,
4244 (errcode_for_file_access(),
4245 errmsg("could not open statistics file \"%s\": %m",
4246 statfile)));
4247 return;
4248 }
4249
4250 /*
4251 * Verify it's of the expected format.
4252 */
4253 if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4254 format_id != PGSTAT_FILE_FORMAT_ID)
4255 {
4256 ereport(pgStatRunningInCollector ? LOG : WARNING,
4257 (errmsg("corrupted statistics file \"%s\"", statfile)));
4258 goto done;
4259 }
4260
4261 /*
4262 * We found an existing collector stats file. Read it and put all the
4263 * hashtable entries into place.
4264 */
4265 for (;;)
4266 {
4267 switch (fgetc(fpin))
4268 {
4269 /*
4270 * 'T' A PgStat_StatTabEntry follows.
4271 */
4272 case 'T':
4273 if (fread(&tabbuf, 1, sizeof(PgStat_StatTabEntry),
4274 fpin) != sizeof(PgStat_StatTabEntry))
4275 {
4276 ereport(pgStatRunningInCollector ? LOG : WARNING,
4277 (errmsg("corrupted statistics file \"%s\"",
4278 statfile)));
4279 goto done;
4280 }
4281
4282 /*
4283 * Skip if table data not wanted.
4284 */
4285 if (tabhash == NULL)
4286 break;
4287
4288 tabentry = (PgStat_StatTabEntry *) hash_search(tabhash,
4289 (void *) &tabbuf.tableid,
4290 HASH_ENTER, &found);
4291
4292 if (found)
4293 {
4294 ereport(pgStatRunningInCollector ? LOG : WARNING,
4295 (errmsg("corrupted statistics file \"%s\"",
4296 statfile)));
4297 goto done;
4298 }
4299
4300 memcpy(tabentry, &tabbuf, sizeof(tabbuf));
4301 break;
4302
4303 /*
4304 * 'F' A PgStat_StatFuncEntry follows.
4305 */
4306 case 'F':
4307 if (fread(&funcbuf, 1, sizeof(PgStat_StatFuncEntry),
4308 fpin) != sizeof(PgStat_StatFuncEntry))
4309 {
4310 ereport(pgStatRunningInCollector ? LOG : WARNING,
4311 (errmsg("corrupted statistics file \"%s\"",
4312 statfile)));
4313 goto done;
4314 }
4315
4316 /*
4317 * Skip if function data not wanted.
4318 */
4319 if (funchash == NULL)
4320 break;
4321
4322 funcentry = (PgStat_StatFuncEntry *) hash_search(funchash,
4323 (void *) &funcbuf.functionid,
4324 HASH_ENTER, &found);
4325
4326 if (found)
4327 {
4328 ereport(pgStatRunningInCollector ? LOG : WARNING,
4329 (errmsg("corrupted statistics file \"%s\"",
4330 statfile)));
4331 goto done;
4332 }
4333
4334 memcpy(funcentry, &funcbuf, sizeof(funcbuf));
4335 break;
4336
4337 /*
4338 * 'E' The EOF marker of a complete stats file.
4339 */
4340 case 'E':
4341 goto done;
4342
4343 default:
4344 ereport(pgStatRunningInCollector ? LOG : WARNING,
4345 (errmsg("corrupted statistics file \"%s\"",
4346 statfile)));
4347 goto done;
4348 }
4349 }
4350
4351 done:
4352 FreeFile(fpin);
4353
4354 if (permanent)
4355 {
4356 elog(DEBUG2, "removing permanent stats file \"%s\"", statfile);
4357 unlink(statfile);
4358 }
4359 }
4360
4361 /* ----------
4362 * pgstat_read_db_statsfile_timestamp() -
4363 *
4364 * Attempt to determine the timestamp of the last db statfile write.
4365 * Returns true if successful; the timestamp is stored in *ts. The caller must
4366 * rely on timestamp stored in *ts iff the function returns true.
4367 *
4368 * This needs to be careful about handling databases for which no stats file
4369 * exists, such as databases without a stat entry or those not yet written:
4370 *
4371 * - if there's a database entry in the global file, return the corresponding
4372 * stats_timestamp value.
4373 *
4374 * - if there's no db stat entry (e.g. for a new or inactive database),
4375 * there's no stats_timestamp value, but also nothing to write so we return
4376 * the timestamp of the global statfile.
4377 * ----------
4378 */
4379 static bool
pgstat_read_db_statsfile_timestamp(Oid databaseid,bool permanent,TimestampTz * ts)4380 pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
4381 TimestampTz *ts)
4382 {
4383 PgStat_StatDBEntry dbentry;
4384 PgStat_GlobalStats myGlobalStats;
4385 PgStat_ArchiverStats myArchiverStats;
4386 PgStat_WalStats myWalStats;
4387 PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
4388 PgStat_StatReplSlotEntry myReplSlotStats;
4389 FILE *fpin;
4390 int32 format_id;
4391 const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
4392
4393 /*
4394 * Try to open the stats file. As above, anything but ENOENT is worthy of
4395 * complaining about.
4396 */
4397 if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
4398 {
4399 if (errno != ENOENT)
4400 ereport(pgStatRunningInCollector ? LOG : WARNING,
4401 (errcode_for_file_access(),
4402 errmsg("could not open statistics file \"%s\": %m",
4403 statfile)));
4404 return false;
4405 }
4406
4407 /*
4408 * Verify it's of the expected format.
4409 */
4410 if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
4411 format_id != PGSTAT_FILE_FORMAT_ID)
4412 {
4413 ereport(pgStatRunningInCollector ? LOG : WARNING,
4414 (errmsg("corrupted statistics file \"%s\"", statfile)));
4415 FreeFile(fpin);
4416 return false;
4417 }
4418
4419 /*
4420 * Read global stats struct
4421 */
4422 if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
4423 fpin) != sizeof(myGlobalStats))
4424 {
4425 ereport(pgStatRunningInCollector ? LOG : WARNING,
4426 (errmsg("corrupted statistics file \"%s\"", statfile)));
4427 FreeFile(fpin);
4428 return false;
4429 }
4430
4431 /*
4432 * Read archiver stats struct
4433 */
4434 if (fread(&myArchiverStats, 1, sizeof(myArchiverStats),
4435 fpin) != sizeof(myArchiverStats))
4436 {
4437 ereport(pgStatRunningInCollector ? LOG : WARNING,
4438 (errmsg("corrupted statistics file \"%s\"", statfile)));
4439 FreeFile(fpin);
4440 return false;
4441 }
4442
4443 /*
4444 * Read WAL stats struct
4445 */
4446 if (fread(&myWalStats, 1, sizeof(myWalStats), fpin) != sizeof(myWalStats))
4447 {
4448 ereport(pgStatRunningInCollector ? LOG : WARNING,
4449 (errmsg("corrupted statistics file \"%s\"", statfile)));
4450 FreeFile(fpin);
4451 return false;
4452 }
4453
4454 /*
4455 * Read SLRU stats struct
4456 */
4457 if (fread(mySLRUStats, 1, sizeof(mySLRUStats), fpin) != sizeof(mySLRUStats))
4458 {
4459 ereport(pgStatRunningInCollector ? LOG : WARNING,
4460 (errmsg("corrupted statistics file \"%s\"", statfile)));
4461 FreeFile(fpin);
4462 return false;
4463 }
4464
4465 /* By default, we're going to return the timestamp of the global file. */
4466 *ts = myGlobalStats.stats_timestamp;
4467
4468 /*
4469 * We found an existing collector stats file. Read it and look for a
4470 * record for the requested database. If found, use its timestamp.
4471 */
4472 for (;;)
4473 {
4474 switch (fgetc(fpin))
4475 {
4476 /*
4477 * 'D' A PgStat_StatDBEntry struct describing a database
4478 * follows.
4479 */
4480 case 'D':
4481 if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
4482 fpin) != offsetof(PgStat_StatDBEntry, tables))
4483 {
4484 ereport(pgStatRunningInCollector ? LOG : WARNING,
4485 (errmsg("corrupted statistics file \"%s\"",
4486 statfile)));
4487 FreeFile(fpin);
4488 return false;
4489 }
4490
4491 /*
4492 * If this is the DB we're looking for, save its timestamp and
4493 * we're done.
4494 */
4495 if (dbentry.databaseid == databaseid)
4496 {
4497 *ts = dbentry.stats_timestamp;
4498 goto done;
4499 }
4500
4501 break;
4502
4503 /*
4504 * 'R' A PgStat_StatReplSlotEntry struct describing a
4505 * replication slot follows.
4506 */
4507 case 'R':
4508 if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
4509 != sizeof(PgStat_StatReplSlotEntry))
4510 {
4511 ereport(pgStatRunningInCollector ? LOG : WARNING,
4512 (errmsg("corrupted statistics file \"%s\"",
4513 statfile)));
4514 FreeFile(fpin);
4515 return false;
4516 }
4517 break;
4518
4519 case 'E':
4520 goto done;
4521
4522 default:
4523 {
4524 ereport(pgStatRunningInCollector ? LOG : WARNING,
4525 (errmsg("corrupted statistics file \"%s\"",
4526 statfile)));
4527 FreeFile(fpin);
4528 return false;
4529 }
4530 }
4531 }
4532
4533 done:
4534 FreeFile(fpin);
4535 return true;
4536 }
4537
4538 /*
4539 * If not already done, read the statistics collector stats file into
4540 * some hash tables. The results will be kept until pgstat_clear_snapshot()
4541 * is called (typically, at end of transaction).
4542 */
4543 static void
backend_read_statsfile(void)4544 backend_read_statsfile(void)
4545 {
4546 TimestampTz min_ts = 0;
4547 TimestampTz ref_ts = 0;
4548 Oid inquiry_db;
4549 int count;
4550
4551 /* already read it? */
4552 if (pgStatDBHash)
4553 return;
4554 Assert(!pgStatRunningInCollector);
4555
4556 /*
4557 * In a normal backend, we check staleness of the data for our own DB, and
4558 * so we send MyDatabaseId in inquiry messages. In the autovac launcher,
4559 * check staleness of the shared-catalog data, and send InvalidOid in
4560 * inquiry messages so as not to force writing unnecessary data.
4561 */
4562 if (IsAutoVacuumLauncherProcess())
4563 inquiry_db = InvalidOid;
4564 else
4565 inquiry_db = MyDatabaseId;
4566
4567 /*
4568 * Loop until fresh enough stats file is available or we ran out of time.
4569 * The stats inquiry message is sent repeatedly in case collector drops
4570 * it; but not every single time, as that just swamps the collector.
4571 */
4572 for (count = 0; count < PGSTAT_POLL_LOOP_COUNT; count++)
4573 {
4574 bool ok;
4575 TimestampTz file_ts = 0;
4576 TimestampTz cur_ts;
4577
4578 CHECK_FOR_INTERRUPTS();
4579
4580 ok = pgstat_read_db_statsfile_timestamp(inquiry_db, false, &file_ts);
4581
4582 cur_ts = GetCurrentTimestamp();
4583 /* Calculate min acceptable timestamp, if we didn't already */
4584 if (count == 0 || cur_ts < ref_ts)
4585 {
4586 /*
4587 * We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
4588 * msec before now. This indirectly ensures that the collector
4589 * needn't write the file more often than PGSTAT_STAT_INTERVAL. In
4590 * an autovacuum worker, however, we want a lower delay to avoid
4591 * using stale data, so we use PGSTAT_RETRY_DELAY (since the
4592 * number of workers is low, this shouldn't be a problem).
4593 *
4594 * We don't recompute min_ts after sleeping, except in the
4595 * unlikely case that cur_ts went backwards. So we might end up
4596 * accepting a file a bit older than PGSTAT_STAT_INTERVAL. In
4597 * practice that shouldn't happen, though, as long as the sleep
4598 * time is less than PGSTAT_STAT_INTERVAL; and we don't want to
4599 * tell the collector that our cutoff time is less than what we'd
4600 * actually accept.
4601 */
4602 ref_ts = cur_ts;
4603 if (IsAutoVacuumWorkerProcess())
4604 min_ts = TimestampTzPlusMilliseconds(ref_ts,
4605 -PGSTAT_RETRY_DELAY);
4606 else
4607 min_ts = TimestampTzPlusMilliseconds(ref_ts,
4608 -PGSTAT_STAT_INTERVAL);
4609 }
4610
4611 /*
4612 * If the file timestamp is actually newer than cur_ts, we must have
4613 * had a clock glitch (system time went backwards) or there is clock
4614 * skew between our processor and the stats collector's processor.
4615 * Accept the file, but send an inquiry message anyway to make
4616 * pgstat_recv_inquiry do a sanity check on the collector's time.
4617 */
4618 if (ok && file_ts > cur_ts)
4619 {
4620 /*
4621 * A small amount of clock skew between processors isn't terribly
4622 * surprising, but a large difference is worth logging. We
4623 * arbitrarily define "large" as 1000 msec.
4624 */
4625 if (file_ts >= TimestampTzPlusMilliseconds(cur_ts, 1000))
4626 {
4627 char *filetime;
4628 char *mytime;
4629
4630 /* Copy because timestamptz_to_str returns a static buffer */
4631 filetime = pstrdup(timestamptz_to_str(file_ts));
4632 mytime = pstrdup(timestamptz_to_str(cur_ts));
4633 ereport(LOG,
4634 (errmsg("statistics collector's time %s is later than backend local time %s",
4635 filetime, mytime)));
4636 pfree(filetime);
4637 pfree(mytime);
4638 }
4639
4640 pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4641 break;
4642 }
4643
4644 /* Normal acceptance case: file is not older than cutoff time */
4645 if (ok && file_ts >= min_ts)
4646 break;
4647
4648 /* Not there or too old, so kick the collector and wait a bit */
4649 if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
4650 pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
4651
4652 pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
4653 }
4654
4655 if (count >= PGSTAT_POLL_LOOP_COUNT)
4656 ereport(LOG,
4657 (errmsg("using stale statistics instead of current ones "
4658 "because stats collector is not responding")));
4659
4660 /*
4661 * Autovacuum launcher wants stats about all databases, but a shallow read
4662 * is sufficient. Regular backends want a deep read for just the tables
4663 * they can see (MyDatabaseId + shared catalogs).
4664 */
4665 if (IsAutoVacuumLauncherProcess())
4666 pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
4667 else
4668 pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
4669 }
4670
4671
4672 /* ----------
4673 * pgstat_setup_memcxt() -
4674 *
4675 * Create pgStatLocalContext, if not already done.
4676 * ----------
4677 */
4678 static void
pgstat_setup_memcxt(void)4679 pgstat_setup_memcxt(void)
4680 {
4681 if (!pgStatLocalContext)
4682 pgStatLocalContext = AllocSetContextCreate(TopMemoryContext,
4683 "Statistics snapshot",
4684 ALLOCSET_SMALL_SIZES);
4685 }
4686
4687
4688 /* ----------
4689 * pgstat_clear_snapshot() -
4690 *
4691 * Discard any data collected in the current transaction. Any subsequent
4692 * request will cause new snapshots to be read.
4693 *
4694 * This is also invoked during transaction commit or abort to discard
4695 * the no-longer-wanted snapshot.
4696 * ----------
4697 */
4698 void
pgstat_clear_snapshot(void)4699 pgstat_clear_snapshot(void)
4700 {
4701 /* Release memory, if any was allocated */
4702 if (pgStatLocalContext)
4703 MemoryContextDelete(pgStatLocalContext);
4704
4705 /* Reset variables */
4706 pgStatLocalContext = NULL;
4707 pgStatDBHash = NULL;
4708 replSlotStatHash = NULL;
4709
4710 /*
4711 * Historically the backend_status.c facilities lived in this file, and
4712 * were reset with the same function. For now keep it that way, and
4713 * forward the reset request.
4714 */
4715 pgstat_clear_backend_activity_snapshot();
4716 }
4717
4718
4719 /* ----------
4720 * pgstat_recv_inquiry() -
4721 *
4722 * Process stat inquiry requests.
4723 * ----------
4724 */
4725 static void
pgstat_recv_inquiry(PgStat_MsgInquiry * msg,int len)4726 pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
4727 {
4728 PgStat_StatDBEntry *dbentry;
4729
4730 elog(DEBUG2, "received inquiry for database %u", msg->databaseid);
4731
4732 /*
4733 * If there's already a write request for this DB, there's nothing to do.
4734 *
4735 * Note that if a request is found, we return early and skip the below
4736 * check for clock skew. This is okay, since the only way for a DB
4737 * request to be present in the list is that we have been here since the
4738 * last write round. It seems sufficient to check for clock skew once per
4739 * write round.
4740 */
4741 if (list_member_oid(pending_write_requests, msg->databaseid))
4742 return;
4743
4744 /*
4745 * Check to see if we last wrote this database at a time >= the requested
4746 * cutoff time. If so, this is a stale request that was generated before
4747 * we updated the DB file, and we don't need to do so again.
4748 *
4749 * If the requestor's local clock time is older than stats_timestamp, we
4750 * should suspect a clock glitch, ie system time going backwards; though
4751 * the more likely explanation is just delayed message receipt. It is
4752 * worth expending a GetCurrentTimestamp call to be sure, since a large
4753 * retreat in the system clock reading could otherwise cause us to neglect
4754 * to update the stats file for a long time.
4755 */
4756 dbentry = pgstat_get_db_entry(msg->databaseid, false);
4757 if (dbentry == NULL)
4758 {
4759 /*
4760 * We have no data for this DB. Enter a write request anyway so that
4761 * the global stats will get updated. This is needed to prevent
4762 * backend_read_statsfile from waiting for data that we cannot supply,
4763 * in the case of a new DB that nobody has yet reported any stats for.
4764 * See the behavior of pgstat_read_db_statsfile_timestamp.
4765 */
4766 }
4767 else if (msg->clock_time < dbentry->stats_timestamp)
4768 {
4769 TimestampTz cur_ts = GetCurrentTimestamp();
4770
4771 if (cur_ts < dbentry->stats_timestamp)
4772 {
4773 /*
4774 * Sure enough, time went backwards. Force a new stats file write
4775 * to get back in sync; but first, log a complaint.
4776 */
4777 char *writetime;
4778 char *mytime;
4779
4780 /* Copy because timestamptz_to_str returns a static buffer */
4781 writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
4782 mytime = pstrdup(timestamptz_to_str(cur_ts));
4783 ereport(LOG,
4784 (errmsg("stats_timestamp %s is later than collector's time %s for database %u",
4785 writetime, mytime, dbentry->databaseid)));
4786 pfree(writetime);
4787 pfree(mytime);
4788 }
4789 else
4790 {
4791 /*
4792 * Nope, it's just an old request. Assuming msg's clock_time is
4793 * >= its cutoff_time, it must be stale, so we can ignore it.
4794 */
4795 return;
4796 }
4797 }
4798 else if (msg->cutoff_time <= dbentry->stats_timestamp)
4799 {
4800 /* Stale request, ignore it */
4801 return;
4802 }
4803
4804 /*
4805 * We need to write this DB, so create a request.
4806 */
4807 pending_write_requests = lappend_oid(pending_write_requests,
4808 msg->databaseid);
4809 }
4810
4811
4812 /* ----------
4813 * pgstat_recv_tabstat() -
4814 *
4815 * Count what the backend has done.
4816 * ----------
4817 */
4818 static void
pgstat_recv_tabstat(PgStat_MsgTabstat * msg,int len)4819 pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
4820 {
4821 PgStat_StatDBEntry *dbentry;
4822 PgStat_StatTabEntry *tabentry;
4823 int i;
4824 bool found;
4825
4826 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
4827
4828 /*
4829 * Update database-wide stats.
4830 */
4831 dbentry->n_xact_commit += (PgStat_Counter) (msg->m_xact_commit);
4832 dbentry->n_xact_rollback += (PgStat_Counter) (msg->m_xact_rollback);
4833 dbentry->n_block_read_time += msg->m_block_read_time;
4834 dbentry->n_block_write_time += msg->m_block_write_time;
4835
4836 dbentry->total_session_time += msg->m_session_time;
4837 dbentry->total_active_time += msg->m_active_time;
4838 dbentry->total_idle_in_xact_time += msg->m_idle_in_xact_time;
4839
4840 /*
4841 * Process all table entries in the message.
4842 */
4843 for (i = 0; i < msg->m_nentries; i++)
4844 {
4845 PgStat_TableEntry *tabmsg = &(msg->m_entry[i]);
4846
4847 tabentry = (PgStat_StatTabEntry *) hash_search(dbentry->tables,
4848 (void *) &(tabmsg->t_id),
4849 HASH_ENTER, &found);
4850
4851 if (!found)
4852 {
4853 /*
4854 * If it's a new table entry, initialize counters to the values we
4855 * just got.
4856 */
4857 tabentry->numscans = tabmsg->t_counts.t_numscans;
4858 tabentry->tuples_returned = tabmsg->t_counts.t_tuples_returned;
4859 tabentry->tuples_fetched = tabmsg->t_counts.t_tuples_fetched;
4860 tabentry->tuples_inserted = tabmsg->t_counts.t_tuples_inserted;
4861 tabentry->tuples_updated = tabmsg->t_counts.t_tuples_updated;
4862 tabentry->tuples_deleted = tabmsg->t_counts.t_tuples_deleted;
4863 tabentry->tuples_hot_updated = tabmsg->t_counts.t_tuples_hot_updated;
4864 tabentry->n_live_tuples = tabmsg->t_counts.t_delta_live_tuples;
4865 tabentry->n_dead_tuples = tabmsg->t_counts.t_delta_dead_tuples;
4866 tabentry->changes_since_analyze = tabmsg->t_counts.t_changed_tuples;
4867 tabentry->inserts_since_vacuum = tabmsg->t_counts.t_tuples_inserted;
4868 tabentry->blocks_fetched = tabmsg->t_counts.t_blocks_fetched;
4869 tabentry->blocks_hit = tabmsg->t_counts.t_blocks_hit;
4870
4871 tabentry->vacuum_timestamp = 0;
4872 tabentry->vacuum_count = 0;
4873 tabentry->autovac_vacuum_timestamp = 0;
4874 tabentry->autovac_vacuum_count = 0;
4875 tabentry->analyze_timestamp = 0;
4876 tabentry->analyze_count = 0;
4877 tabentry->autovac_analyze_timestamp = 0;
4878 tabentry->autovac_analyze_count = 0;
4879 }
4880 else
4881 {
4882 /*
4883 * Otherwise add the values to the existing entry.
4884 */
4885 tabentry->numscans += tabmsg->t_counts.t_numscans;
4886 tabentry->tuples_returned += tabmsg->t_counts.t_tuples_returned;
4887 tabentry->tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4888 tabentry->tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4889 tabentry->tuples_updated += tabmsg->t_counts.t_tuples_updated;
4890 tabentry->tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4891 tabentry->tuples_hot_updated += tabmsg->t_counts.t_tuples_hot_updated;
4892 /* If table was truncated, first reset the live/dead counters */
4893 if (tabmsg->t_counts.t_truncated)
4894 {
4895 tabentry->n_live_tuples = 0;
4896 tabentry->n_dead_tuples = 0;
4897 tabentry->inserts_since_vacuum = 0;
4898 }
4899 tabentry->n_live_tuples += tabmsg->t_counts.t_delta_live_tuples;
4900 tabentry->n_dead_tuples += tabmsg->t_counts.t_delta_dead_tuples;
4901 tabentry->changes_since_analyze += tabmsg->t_counts.t_changed_tuples;
4902 tabentry->inserts_since_vacuum += tabmsg->t_counts.t_tuples_inserted;
4903 tabentry->blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4904 tabentry->blocks_hit += tabmsg->t_counts.t_blocks_hit;
4905 }
4906
4907 /* Clamp n_live_tuples in case of negative delta_live_tuples */
4908 tabentry->n_live_tuples = Max(tabentry->n_live_tuples, 0);
4909 /* Likewise for n_dead_tuples */
4910 tabentry->n_dead_tuples = Max(tabentry->n_dead_tuples, 0);
4911
4912 /*
4913 * Add per-table stats to the per-database entry, too.
4914 */
4915 dbentry->n_tuples_returned += tabmsg->t_counts.t_tuples_returned;
4916 dbentry->n_tuples_fetched += tabmsg->t_counts.t_tuples_fetched;
4917 dbentry->n_tuples_inserted += tabmsg->t_counts.t_tuples_inserted;
4918 dbentry->n_tuples_updated += tabmsg->t_counts.t_tuples_updated;
4919 dbentry->n_tuples_deleted += tabmsg->t_counts.t_tuples_deleted;
4920 dbentry->n_blocks_fetched += tabmsg->t_counts.t_blocks_fetched;
4921 dbentry->n_blocks_hit += tabmsg->t_counts.t_blocks_hit;
4922 }
4923 }
4924
4925
4926 /* ----------
4927 * pgstat_recv_tabpurge() -
4928 *
4929 * Arrange for dead table removal.
4930 * ----------
4931 */
4932 static void
pgstat_recv_tabpurge(PgStat_MsgTabpurge * msg,int len)4933 pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
4934 {
4935 PgStat_StatDBEntry *dbentry;
4936 int i;
4937
4938 dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
4939
4940 /*
4941 * No need to purge if we don't even know the database.
4942 */
4943 if (!dbentry || !dbentry->tables)
4944 return;
4945
4946 /*
4947 * Process all table entries in the message.
4948 */
4949 for (i = 0; i < msg->m_nentries; i++)
4950 {
4951 /* Remove from hashtable if present; we don't care if it's not. */
4952 (void) hash_search(dbentry->tables,
4953 (void *) &(msg->m_tableid[i]),
4954 HASH_REMOVE, NULL);
4955 }
4956 }
4957
4958
4959 /* ----------
4960 * pgstat_recv_dropdb() -
4961 *
4962 * Arrange for dead database removal
4963 * ----------
4964 */
4965 static void
pgstat_recv_dropdb(PgStat_MsgDropdb * msg,int len)4966 pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
4967 {
4968 Oid dbid = msg->m_databaseid;
4969 PgStat_StatDBEntry *dbentry;
4970
4971 /*
4972 * Lookup the database in the hashtable.
4973 */
4974 dbentry = pgstat_get_db_entry(dbid, false);
4975
4976 /*
4977 * If found, remove it (along with the db statfile).
4978 */
4979 if (dbentry)
4980 {
4981 char statfile[MAXPGPATH];
4982
4983 get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
4984
4985 elog(DEBUG2, "removing stats file \"%s\"", statfile);
4986 unlink(statfile);
4987
4988 if (dbentry->tables != NULL)
4989 hash_destroy(dbentry->tables);
4990 if (dbentry->functions != NULL)
4991 hash_destroy(dbentry->functions);
4992
4993 if (hash_search(pgStatDBHash,
4994 (void *) &dbid,
4995 HASH_REMOVE, NULL) == NULL)
4996 ereport(ERROR,
4997 (errmsg("database hash table corrupted during cleanup --- abort")));
4998 }
4999 }
5000
5001
5002 /* ----------
5003 * pgstat_recv_resetcounter() -
5004 *
5005 * Reset the statistics for the specified database.
5006 * ----------
5007 */
5008 static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter * msg,int len)5009 pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
5010 {
5011 PgStat_StatDBEntry *dbentry;
5012
5013 /*
5014 * Lookup the database in the hashtable. Nothing to do if not there.
5015 */
5016 dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5017
5018 if (!dbentry)
5019 return;
5020
5021 /*
5022 * We simply throw away all the database's table entries by recreating a
5023 * new hash table for them.
5024 */
5025 if (dbentry->tables != NULL)
5026 hash_destroy(dbentry->tables);
5027 if (dbentry->functions != NULL)
5028 hash_destroy(dbentry->functions);
5029
5030 dbentry->tables = NULL;
5031 dbentry->functions = NULL;
5032
5033 /*
5034 * Reset database-level stats, too. This creates empty hash tables for
5035 * tables and functions.
5036 */
5037 reset_dbentry_counters(dbentry);
5038 }
5039
5040 /* ----------
5041 * pgstat_recv_resetsharedcounter() -
5042 *
5043 * Reset some shared statistics of the cluster.
5044 * ----------
5045 */
5046 static void
pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter * msg,int len)5047 pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len)
5048 {
5049 if (msg->m_resettarget == RESET_BGWRITER)
5050 {
5051 /* Reset the global background writer statistics for the cluster. */
5052 memset(&globalStats, 0, sizeof(globalStats));
5053 globalStats.stat_reset_timestamp = GetCurrentTimestamp();
5054 }
5055 else if (msg->m_resettarget == RESET_ARCHIVER)
5056 {
5057 /* Reset the archiver statistics for the cluster. */
5058 memset(&archiverStats, 0, sizeof(archiverStats));
5059 archiverStats.stat_reset_timestamp = GetCurrentTimestamp();
5060 }
5061 else if (msg->m_resettarget == RESET_WAL)
5062 {
5063 /* Reset the WAL statistics for the cluster. */
5064 memset(&walStats, 0, sizeof(walStats));
5065 walStats.stat_reset_timestamp = GetCurrentTimestamp();
5066 }
5067
5068 /*
5069 * Presumably the sender of this message validated the target, don't
5070 * complain here if it's not valid
5071 */
5072 }
5073
5074 /* ----------
5075 * pgstat_recv_resetsinglecounter() -
5076 *
5077 * Reset a statistics for a single object
5078 * ----------
5079 */
5080 static void
pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter * msg,int len)5081 pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len)
5082 {
5083 PgStat_StatDBEntry *dbentry;
5084
5085 dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5086
5087 if (!dbentry)
5088 return;
5089
5090 /* Set the reset timestamp for the whole database */
5091 dbentry->stat_reset_timestamp = GetCurrentTimestamp();
5092
5093 /* Remove object if it exists, ignore it if not */
5094 if (msg->m_resettype == RESET_TABLE)
5095 (void) hash_search(dbentry->tables, (void *) &(msg->m_objectid),
5096 HASH_REMOVE, NULL);
5097 else if (msg->m_resettype == RESET_FUNCTION)
5098 (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid),
5099 HASH_REMOVE, NULL);
5100 }
5101
5102 /* ----------
5103 * pgstat_recv_resetslrucounter() -
5104 *
5105 * Reset some SLRU statistics of the cluster.
5106 * ----------
5107 */
5108 static void
pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter * msg,int len)5109 pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
5110 {
5111 int i;
5112 TimestampTz ts = GetCurrentTimestamp();
5113
5114 for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5115 {
5116 /* reset entry with the given index, or all entries (index is -1) */
5117 if ((msg->m_index == -1) || (msg->m_index == i))
5118 {
5119 memset(&slruStats[i], 0, sizeof(slruStats[i]));
5120 slruStats[i].stat_reset_timestamp = ts;
5121 }
5122 }
5123 }
5124
5125 /* ----------
5126 * pgstat_recv_resetreplslotcounter() -
5127 *
5128 * Reset some replication slot statistics of the cluster.
5129 * ----------
5130 */
5131 static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter * msg,int len)5132 pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
5133 int len)
5134 {
5135 PgStat_StatReplSlotEntry *slotent;
5136 TimestampTz ts;
5137
5138 /* Return if we don't have replication slot statistics */
5139 if (replSlotStatHash == NULL)
5140 return;
5141
5142 ts = GetCurrentTimestamp();
5143 if (msg->clearall)
5144 {
5145 HASH_SEQ_STATUS sstat;
5146
5147 hash_seq_init(&sstat, replSlotStatHash);
5148 while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
5149 pgstat_reset_replslot(slotent, ts);
5150 }
5151 else
5152 {
5153 /* Get the slot statistics to reset */
5154 slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
5155
5156 /*
5157 * Nothing to do if the given slot entry is not found. This could
5158 * happen when the slot with the given name is removed and the
5159 * corresponding statistics entry is also removed before receiving the
5160 * reset message.
5161 */
5162 if (!slotent)
5163 return;
5164
5165 /* Reset the stats for the requested replication slot */
5166 pgstat_reset_replslot(slotent, ts);
5167 }
5168 }
5169
5170
5171 /* ----------
5172 * pgstat_recv_autovac() -
5173 *
5174 * Process an autovacuum signaling message.
5175 * ----------
5176 */
5177 static void
pgstat_recv_autovac(PgStat_MsgAutovacStart * msg,int len)5178 pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len)
5179 {
5180 PgStat_StatDBEntry *dbentry;
5181
5182 /*
5183 * Store the last autovacuum time in the database's hashtable entry.
5184 */
5185 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5186
5187 dbentry->last_autovac_time = msg->m_start_time;
5188 }
5189
5190 /* ----------
5191 * pgstat_recv_vacuum() -
5192 *
5193 * Process a VACUUM message.
5194 * ----------
5195 */
5196 static void
pgstat_recv_vacuum(PgStat_MsgVacuum * msg,int len)5197 pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len)
5198 {
5199 PgStat_StatDBEntry *dbentry;
5200 PgStat_StatTabEntry *tabentry;
5201
5202 /*
5203 * Store the data in the table's hashtable entry.
5204 */
5205 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5206
5207 tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5208
5209 tabentry->n_live_tuples = msg->m_live_tuples;
5210 tabentry->n_dead_tuples = msg->m_dead_tuples;
5211
5212 /*
5213 * It is quite possible that a non-aggressive VACUUM ended up skipping
5214 * various pages, however, we'll zero the insert counter here regardless.
5215 * It's currently used only to track when we need to perform an "insert"
5216 * autovacuum, which are mainly intended to freeze newly inserted tuples.
5217 * Zeroing this may just mean we'll not try to vacuum the table again
5218 * until enough tuples have been inserted to trigger another insert
5219 * autovacuum. An anti-wraparound autovacuum will catch any persistent
5220 * stragglers.
5221 */
5222 tabentry->inserts_since_vacuum = 0;
5223
5224 if (msg->m_autovacuum)
5225 {
5226 tabentry->autovac_vacuum_timestamp = msg->m_vacuumtime;
5227 tabentry->autovac_vacuum_count++;
5228 }
5229 else
5230 {
5231 tabentry->vacuum_timestamp = msg->m_vacuumtime;
5232 tabentry->vacuum_count++;
5233 }
5234 }
5235
5236 /* ----------
5237 * pgstat_recv_analyze() -
5238 *
5239 * Process an ANALYZE message.
5240 * ----------
5241 */
5242 static void
pgstat_recv_analyze(PgStat_MsgAnalyze * msg,int len)5243 pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len)
5244 {
5245 PgStat_StatDBEntry *dbentry;
5246 PgStat_StatTabEntry *tabentry;
5247
5248 /*
5249 * Store the data in the table's hashtable entry.
5250 */
5251 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5252
5253 tabentry = pgstat_get_tab_entry(dbentry, msg->m_tableoid, true);
5254
5255 tabentry->n_live_tuples = msg->m_live_tuples;
5256 tabentry->n_dead_tuples = msg->m_dead_tuples;
5257
5258 /*
5259 * If commanded, reset changes_since_analyze to zero. This forgets any
5260 * changes that were committed while the ANALYZE was in progress, but we
5261 * have no good way to estimate how many of those there were.
5262 */
5263 if (msg->m_resetcounter)
5264 tabentry->changes_since_analyze = 0;
5265
5266 if (msg->m_autovacuum)
5267 {
5268 tabentry->autovac_analyze_timestamp = msg->m_analyzetime;
5269 tabentry->autovac_analyze_count++;
5270 }
5271 else
5272 {
5273 tabentry->analyze_timestamp = msg->m_analyzetime;
5274 tabentry->analyze_count++;
5275 }
5276 }
5277
5278
5279 /* ----------
5280 * pgstat_recv_archiver() -
5281 *
5282 * Process a ARCHIVER message.
5283 * ----------
5284 */
5285 static void
pgstat_recv_archiver(PgStat_MsgArchiver * msg,int len)5286 pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len)
5287 {
5288 if (msg->m_failed)
5289 {
5290 /* Failed archival attempt */
5291 ++archiverStats.failed_count;
5292 memcpy(archiverStats.last_failed_wal, msg->m_xlog,
5293 sizeof(archiverStats.last_failed_wal));
5294 archiverStats.last_failed_timestamp = msg->m_timestamp;
5295 }
5296 else
5297 {
5298 /* Successful archival operation */
5299 ++archiverStats.archived_count;
5300 memcpy(archiverStats.last_archived_wal, msg->m_xlog,
5301 sizeof(archiverStats.last_archived_wal));
5302 archiverStats.last_archived_timestamp = msg->m_timestamp;
5303 }
5304 }
5305
5306 /* ----------
5307 * pgstat_recv_bgwriter() -
5308 *
5309 * Process a BGWRITER message.
5310 * ----------
5311 */
5312 static void
pgstat_recv_bgwriter(PgStat_MsgBgWriter * msg,int len)5313 pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len)
5314 {
5315 globalStats.timed_checkpoints += msg->m_timed_checkpoints;
5316 globalStats.requested_checkpoints += msg->m_requested_checkpoints;
5317 globalStats.checkpoint_write_time += msg->m_checkpoint_write_time;
5318 globalStats.checkpoint_sync_time += msg->m_checkpoint_sync_time;
5319 globalStats.buf_written_checkpoints += msg->m_buf_written_checkpoints;
5320 globalStats.buf_written_clean += msg->m_buf_written_clean;
5321 globalStats.maxwritten_clean += msg->m_maxwritten_clean;
5322 globalStats.buf_written_backend += msg->m_buf_written_backend;
5323 globalStats.buf_fsync_backend += msg->m_buf_fsync_backend;
5324 globalStats.buf_alloc += msg->m_buf_alloc;
5325 }
5326
5327 /* ----------
5328 * pgstat_recv_wal() -
5329 *
5330 * Process a WAL message.
5331 * ----------
5332 */
5333 static void
pgstat_recv_wal(PgStat_MsgWal * msg,int len)5334 pgstat_recv_wal(PgStat_MsgWal *msg, int len)
5335 {
5336 walStats.wal_records += msg->m_wal_records;
5337 walStats.wal_fpi += msg->m_wal_fpi;
5338 walStats.wal_bytes += msg->m_wal_bytes;
5339 walStats.wal_buffers_full += msg->m_wal_buffers_full;
5340 walStats.wal_write += msg->m_wal_write;
5341 walStats.wal_sync += msg->m_wal_sync;
5342 walStats.wal_write_time += msg->m_wal_write_time;
5343 walStats.wal_sync_time += msg->m_wal_sync_time;
5344 }
5345
5346 /* ----------
5347 * pgstat_recv_slru() -
5348 *
5349 * Process a SLRU message.
5350 * ----------
5351 */
5352 static void
pgstat_recv_slru(PgStat_MsgSLRU * msg,int len)5353 pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
5354 {
5355 slruStats[msg->m_index].blocks_zeroed += msg->m_blocks_zeroed;
5356 slruStats[msg->m_index].blocks_hit += msg->m_blocks_hit;
5357 slruStats[msg->m_index].blocks_read += msg->m_blocks_read;
5358 slruStats[msg->m_index].blocks_written += msg->m_blocks_written;
5359 slruStats[msg->m_index].blocks_exists += msg->m_blocks_exists;
5360 slruStats[msg->m_index].flush += msg->m_flush;
5361 slruStats[msg->m_index].truncate += msg->m_truncate;
5362 }
5363
5364 /* ----------
5365 * pgstat_recv_recoveryconflict() -
5366 *
5367 * Process a RECOVERYCONFLICT message.
5368 * ----------
5369 */
5370 static void
pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict * msg,int len)5371 pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
5372 {
5373 PgStat_StatDBEntry *dbentry;
5374
5375 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5376
5377 switch (msg->m_reason)
5378 {
5379 case PROCSIG_RECOVERY_CONFLICT_DATABASE:
5380
5381 /*
5382 * Since we drop the information about the database as soon as it
5383 * replicates, there is no point in counting these conflicts.
5384 */
5385 break;
5386 case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
5387 dbentry->n_conflict_tablespace++;
5388 break;
5389 case PROCSIG_RECOVERY_CONFLICT_LOCK:
5390 dbentry->n_conflict_lock++;
5391 break;
5392 case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
5393 dbentry->n_conflict_snapshot++;
5394 break;
5395 case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
5396 dbentry->n_conflict_bufferpin++;
5397 break;
5398 case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
5399 dbentry->n_conflict_startup_deadlock++;
5400 break;
5401 }
5402 }
5403
5404 /* ----------
5405 * pgstat_recv_deadlock() -
5406 *
5407 * Process a DEADLOCK message.
5408 * ----------
5409 */
5410 static void
pgstat_recv_deadlock(PgStat_MsgDeadlock * msg,int len)5411 pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len)
5412 {
5413 PgStat_StatDBEntry *dbentry;
5414
5415 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5416
5417 dbentry->n_deadlocks++;
5418 }
5419
5420 /* ----------
5421 * pgstat_recv_checksum_failure() -
5422 *
5423 * Process a CHECKSUMFAILURE message.
5424 * ----------
5425 */
5426 static void
pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure * msg,int len)5427 pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
5428 {
5429 PgStat_StatDBEntry *dbentry;
5430
5431 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5432
5433 dbentry->n_checksum_failures += msg->m_failurecount;
5434 dbentry->last_checksum_failure = msg->m_failure_time;
5435 }
5436
5437 /* ----------
5438 * pgstat_recv_replslot() -
5439 *
5440 * Process a REPLSLOT message.
5441 * ----------
5442 */
5443 static void
pgstat_recv_replslot(PgStat_MsgReplSlot * msg,int len)5444 pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
5445 {
5446 if (msg->m_drop)
5447 {
5448 Assert(!msg->m_create);
5449
5450 /* Remove the replication slot statistics with the given name */
5451 if (replSlotStatHash != NULL)
5452 (void) hash_search(replSlotStatHash,
5453 (void *) &(msg->m_slotname),
5454 HASH_REMOVE,
5455 NULL);
5456 }
5457 else
5458 {
5459 PgStat_StatReplSlotEntry *slotent;
5460
5461 slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
5462 Assert(slotent);
5463
5464 if (msg->m_create)
5465 {
5466 /*
5467 * If the message for dropping the slot with the same name gets
5468 * lost, slotent has stats for the old slot. So we initialize all
5469 * counters at slot creation.
5470 */
5471 pgstat_reset_replslot(slotent, 0);
5472 }
5473 else
5474 {
5475 /* Update the replication slot statistics */
5476 slotent->spill_txns += msg->m_spill_txns;
5477 slotent->spill_count += msg->m_spill_count;
5478 slotent->spill_bytes += msg->m_spill_bytes;
5479 slotent->stream_txns += msg->m_stream_txns;
5480 slotent->stream_count += msg->m_stream_count;
5481 slotent->stream_bytes += msg->m_stream_bytes;
5482 slotent->total_txns += msg->m_total_txns;
5483 slotent->total_bytes += msg->m_total_bytes;
5484 }
5485 }
5486 }
5487
5488 /* ----------
5489 * pgstat_recv_connect() -
5490 *
5491 * Process a CONNECT message.
5492 * ----------
5493 */
5494 static void
pgstat_recv_connect(PgStat_MsgConnect * msg,int len)5495 pgstat_recv_connect(PgStat_MsgConnect *msg, int len)
5496 {
5497 PgStat_StatDBEntry *dbentry;
5498
5499 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5500 dbentry->n_sessions++;
5501 }
5502
5503 /* ----------
5504 * pgstat_recv_disconnect() -
5505 *
5506 * Process a DISCONNECT message.
5507 * ----------
5508 */
5509 static void
pgstat_recv_disconnect(PgStat_MsgDisconnect * msg,int len)5510 pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len)
5511 {
5512 PgStat_StatDBEntry *dbentry;
5513
5514 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5515
5516 switch (msg->m_cause)
5517 {
5518 case DISCONNECT_NOT_YET:
5519 case DISCONNECT_NORMAL:
5520 /* we don't collect these */
5521 break;
5522 case DISCONNECT_CLIENT_EOF:
5523 dbentry->n_sessions_abandoned++;
5524 break;
5525 case DISCONNECT_FATAL:
5526 dbentry->n_sessions_fatal++;
5527 break;
5528 case DISCONNECT_KILLED:
5529 dbentry->n_sessions_killed++;
5530 break;
5531 }
5532 }
5533
5534 /* ----------
5535 * pgstat_recv_tempfile() -
5536 *
5537 * Process a TEMPFILE message.
5538 * ----------
5539 */
5540 static void
pgstat_recv_tempfile(PgStat_MsgTempFile * msg,int len)5541 pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len)
5542 {
5543 PgStat_StatDBEntry *dbentry;
5544
5545 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5546
5547 dbentry->n_temp_bytes += msg->m_filesize;
5548 dbentry->n_temp_files += 1;
5549 }
5550
5551 /* ----------
5552 * pgstat_recv_funcstat() -
5553 *
5554 * Count what the backend has done.
5555 * ----------
5556 */
5557 static void
pgstat_recv_funcstat(PgStat_MsgFuncstat * msg,int len)5558 pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len)
5559 {
5560 PgStat_FunctionEntry *funcmsg = &(msg->m_entry[0]);
5561 PgStat_StatDBEntry *dbentry;
5562 PgStat_StatFuncEntry *funcentry;
5563 int i;
5564 bool found;
5565
5566 dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
5567
5568 /*
5569 * Process all function entries in the message.
5570 */
5571 for (i = 0; i < msg->m_nentries; i++, funcmsg++)
5572 {
5573 funcentry = (PgStat_StatFuncEntry *) hash_search(dbentry->functions,
5574 (void *) &(funcmsg->f_id),
5575 HASH_ENTER, &found);
5576
5577 if (!found)
5578 {
5579 /*
5580 * If it's a new function entry, initialize counters to the values
5581 * we just got.
5582 */
5583 funcentry->f_numcalls = funcmsg->f_numcalls;
5584 funcentry->f_total_time = funcmsg->f_total_time;
5585 funcentry->f_self_time = funcmsg->f_self_time;
5586 }
5587 else
5588 {
5589 /*
5590 * Otherwise add the values to the existing entry.
5591 */
5592 funcentry->f_numcalls += funcmsg->f_numcalls;
5593 funcentry->f_total_time += funcmsg->f_total_time;
5594 funcentry->f_self_time += funcmsg->f_self_time;
5595 }
5596 }
5597 }
5598
5599 /* ----------
5600 * pgstat_recv_funcpurge() -
5601 *
5602 * Arrange for dead function removal.
5603 * ----------
5604 */
5605 static void
pgstat_recv_funcpurge(PgStat_MsgFuncpurge * msg,int len)5606 pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
5607 {
5608 PgStat_StatDBEntry *dbentry;
5609 int i;
5610
5611 dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
5612
5613 /*
5614 * No need to purge if we don't even know the database.
5615 */
5616 if (!dbentry || !dbentry->functions)
5617 return;
5618
5619 /*
5620 * Process all function entries in the message.
5621 */
5622 for (i = 0; i < msg->m_nentries; i++)
5623 {
5624 /* Remove from hashtable if present; we don't care if it's not. */
5625 (void) hash_search(dbentry->functions,
5626 (void *) &(msg->m_functionid[i]),
5627 HASH_REMOVE, NULL);
5628 }
5629 }
5630
5631 /* ----------
5632 * pgstat_write_statsfile_needed() -
5633 *
5634 * Do we need to write out any stats files?
5635 * ----------
5636 */
5637 static bool
pgstat_write_statsfile_needed(void)5638 pgstat_write_statsfile_needed(void)
5639 {
5640 if (pending_write_requests != NIL)
5641 return true;
5642
5643 /* Everything was written recently */
5644 return false;
5645 }
5646
5647 /* ----------
5648 * pgstat_db_requested() -
5649 *
5650 * Checks whether stats for a particular DB need to be written to a file.
5651 * ----------
5652 */
5653 static bool
pgstat_db_requested(Oid databaseid)5654 pgstat_db_requested(Oid databaseid)
5655 {
5656 /*
5657 * If any requests are outstanding at all, we should write the stats for
5658 * shared catalogs (the "database" with OID 0). This ensures that
5659 * backends will see up-to-date stats for shared catalogs, even though
5660 * they send inquiry messages mentioning only their own DB.
5661 */
5662 if (databaseid == InvalidOid && pending_write_requests != NIL)
5663 return true;
5664
5665 /* Search to see if there's an open request to write this database. */
5666 if (list_member_oid(pending_write_requests, databaseid))
5667 return true;
5668
5669 return false;
5670 }
5671
5672 /* ----------
5673 * pgstat_replslot_entry
5674 *
5675 * Return the entry of replication slot stats with the given name. Return
5676 * NULL if not found and the caller didn't request to create it.
5677 *
5678 * create tells whether to create the new slot entry if it is not found.
5679 * ----------
5680 */
5681 static PgStat_StatReplSlotEntry *
pgstat_get_replslot_entry(NameData name,bool create)5682 pgstat_get_replslot_entry(NameData name, bool create)
5683 {
5684 PgStat_StatReplSlotEntry *slotent;
5685 bool found;
5686
5687 if (replSlotStatHash == NULL)
5688 {
5689 HASHCTL hash_ctl;
5690
5691 /*
5692 * Quick return NULL if the hash table is empty and the caller didn't
5693 * request to create the entry.
5694 */
5695 if (!create)
5696 return NULL;
5697
5698 hash_ctl.keysize = sizeof(NameData);
5699 hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
5700 replSlotStatHash = hash_create("Replication slots hash",
5701 PGSTAT_REPLSLOT_HASH_SIZE,
5702 &hash_ctl,
5703 HASH_ELEM | HASH_BLOBS);
5704 }
5705
5706 slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
5707 (void *) &name,
5708 create ? HASH_ENTER : HASH_FIND,
5709 &found);
5710
5711 if (!slotent)
5712 {
5713 /* not found */
5714 Assert(!create && !found);
5715 return NULL;
5716 }
5717
5718 /* initialize the entry */
5719 if (create && !found)
5720 {
5721 namestrcpy(&(slotent->slotname), NameStr(name));
5722 pgstat_reset_replslot(slotent, 0);
5723 }
5724
5725 return slotent;
5726 }
5727
5728 /* ----------
5729 * pgstat_reset_replslot
5730 *
5731 * Reset the given replication slot stats.
5732 * ----------
5733 */
5734 static void
pgstat_reset_replslot(PgStat_StatReplSlotEntry * slotent,TimestampTz ts)5735 pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
5736 {
5737 /* reset only counters. Don't clear slot name */
5738 slotent->spill_txns = 0;
5739 slotent->spill_count = 0;
5740 slotent->spill_bytes = 0;
5741 slotent->stream_txns = 0;
5742 slotent->stream_count = 0;
5743 slotent->stream_bytes = 0;
5744 slotent->total_txns = 0;
5745 slotent->total_bytes = 0;
5746 slotent->stat_reset_timestamp = ts;
5747 }
5748
5749 /*
5750 * pgstat_slru_index
5751 *
5752 * Determine index of entry for a SLRU with a given name. If there's no exact
5753 * match, returns index of the last "other" entry used for SLRUs defined in
5754 * external projects.
5755 */
5756 int
pgstat_slru_index(const char * name)5757 pgstat_slru_index(const char *name)
5758 {
5759 int i;
5760
5761 for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
5762 {
5763 if (strcmp(slru_names[i], name) == 0)
5764 return i;
5765 }
5766
5767 /* return index of the last entry (which is the "other" one) */
5768 return (SLRU_NUM_ELEMENTS - 1);
5769 }
5770
5771 /*
5772 * pgstat_slru_name
5773 *
5774 * Returns SLRU name for an index. The index may be above SLRU_NUM_ELEMENTS,
5775 * in which case this returns NULL. This allows writing code that does not
5776 * know the number of entries in advance.
5777 */
5778 const char *
pgstat_slru_name(int slru_idx)5779 pgstat_slru_name(int slru_idx)
5780 {
5781 if (slru_idx < 0 || slru_idx >= SLRU_NUM_ELEMENTS)
5782 return NULL;
5783
5784 return slru_names[slru_idx];
5785 }
5786
5787 /*
5788 * slru_entry
5789 *
5790 * Returns pointer to entry with counters for given SLRU (based on the name
5791 * stored in SlruCtl as lwlock tranche name).
5792 */
5793 static inline PgStat_MsgSLRU *
slru_entry(int slru_idx)5794 slru_entry(int slru_idx)
5795 {
5796 /*
5797 * The postmaster should never register any SLRU statistics counts; if it
5798 * did, the counts would be duplicated into child processes via fork().
5799 */
5800 Assert(IsUnderPostmaster || !IsPostmasterEnvironment);
5801
5802 Assert((slru_idx >= 0) && (slru_idx < SLRU_NUM_ELEMENTS));
5803
5804 return &SLRUStats[slru_idx];
5805 }
5806
5807 /*
5808 * SLRU statistics count accumulation functions --- called from slru.c
5809 */
5810
5811 void
pgstat_count_slru_page_zeroed(int slru_idx)5812 pgstat_count_slru_page_zeroed(int slru_idx)
5813 {
5814 slru_entry(slru_idx)->m_blocks_zeroed += 1;
5815 }
5816
5817 void
pgstat_count_slru_page_hit(int slru_idx)5818 pgstat_count_slru_page_hit(int slru_idx)
5819 {
5820 slru_entry(slru_idx)->m_blocks_hit += 1;
5821 }
5822
5823 void
pgstat_count_slru_page_exists(int slru_idx)5824 pgstat_count_slru_page_exists(int slru_idx)
5825 {
5826 slru_entry(slru_idx)->m_blocks_exists += 1;
5827 }
5828
5829 void
pgstat_count_slru_page_read(int slru_idx)5830 pgstat_count_slru_page_read(int slru_idx)
5831 {
5832 slru_entry(slru_idx)->m_blocks_read += 1;
5833 }
5834
5835 void
pgstat_count_slru_page_written(int slru_idx)5836 pgstat_count_slru_page_written(int slru_idx)
5837 {
5838 slru_entry(slru_idx)->m_blocks_written += 1;
5839 }
5840
5841 void
pgstat_count_slru_flush(int slru_idx)5842 pgstat_count_slru_flush(int slru_idx)
5843 {
5844 slru_entry(slru_idx)->m_flush += 1;
5845 }
5846
5847 void
pgstat_count_slru_truncate(int slru_idx)5848 pgstat_count_slru_truncate(int slru_idx)
5849 {
5850 slru_entry(slru_idx)->m_truncate += 1;
5851 }
5852