1 /*-------------------------------------------------------------------------
2 *
3 * lwlock.c
4 * Lightweight lock manager
5 *
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
12 *
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
22 *
23 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
28 *
29 * NOTES:
30 *
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
37 *
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
40 *
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
46 *
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50 *
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
54 *
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
58 *
59 *
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
65
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
72 *
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
76 */
77 #include "postgres.h"
78
79 #include "miscadmin.h"
80 #include "pg_trace.h"
81 #include "pgstat.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94
95
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98
99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
102
103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
105
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109
110 /*
111 * There are three sorts of LWLock "tranches":
112 *
113 * 1. The individually-named locks defined in lwlocknames.h each have their
114 * own tranche. The names of these tranches appear in IndividualLWLockNames[]
115 * in lwlocknames.c.
116 *
117 * 2. There are some predefined tranches for built-in groups of locks.
118 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
119 * appear in BuiltinTrancheNames[] below.
120 *
121 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
122 * or LWLockRegisterTranche. The names of these that are known in the current
123 * process appear in LWLockTrancheNames[].
124 *
125 * All these names are user-visible as wait event names, so choose with care
126 * ... and do not forget to update the documentation's list of wait events.
127 */
128 extern const char *const IndividualLWLockNames[]; /* in lwlocknames.c */
129
130 static const char *const BuiltinTrancheNames[] = {
131 /* LWTRANCHE_XACT_BUFFER: */
132 "XactBuffer",
133 /* LWTRANCHE_COMMITTS_BUFFER: */
134 "CommitTSBuffer",
135 /* LWTRANCHE_SUBTRANS_BUFFER: */
136 "SubtransBuffer",
137 /* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
138 "MultiXactOffsetBuffer",
139 /* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
140 "MultiXactMemberBuffer",
141 /* LWTRANCHE_NOTIFY_BUFFER: */
142 "NotifyBuffer",
143 /* LWTRANCHE_SERIAL_BUFFER: */
144 "SerialBuffer",
145 /* LWTRANCHE_WAL_INSERT: */
146 "WALInsert",
147 /* LWTRANCHE_BUFFER_CONTENT: */
148 "BufferContent",
149 /* LWTRANCHE_REPLICATION_ORIGIN_STATE: */
150 "ReplicationOriginState",
151 /* LWTRANCHE_REPLICATION_SLOT_IO: */
152 "ReplicationSlotIO",
153 /* LWTRANCHE_LOCK_FASTPATH: */
154 "LockFastPath",
155 /* LWTRANCHE_BUFFER_MAPPING: */
156 "BufferMapping",
157 /* LWTRANCHE_LOCK_MANAGER: */
158 "LockManager",
159 /* LWTRANCHE_PREDICATE_LOCK_MANAGER: */
160 "PredicateLockManager",
161 /* LWTRANCHE_PARALLEL_HASH_JOIN: */
162 "ParallelHashJoin",
163 /* LWTRANCHE_PARALLEL_QUERY_DSA: */
164 "ParallelQueryDSA",
165 /* LWTRANCHE_PER_SESSION_DSA: */
166 "PerSessionDSA",
167 /* LWTRANCHE_PER_SESSION_RECORD_TYPE: */
168 "PerSessionRecordType",
169 /* LWTRANCHE_PER_SESSION_RECORD_TYPMOD: */
170 "PerSessionRecordTypmod",
171 /* LWTRANCHE_SHARED_TUPLESTORE: */
172 "SharedTupleStore",
173 /* LWTRANCHE_SHARED_TIDBITMAP: */
174 "SharedTidBitmap",
175 /* LWTRANCHE_PARALLEL_APPEND: */
176 "ParallelAppend",
177 /* LWTRANCHE_PER_XACT_PREDICATE_LIST: */
178 "PerXactPredicateList"
179 };
180
181 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
182 LWTRANCHE_FIRST_USER_DEFINED - NUM_INDIVIDUAL_LWLOCKS,
183 "missing entries in BuiltinTrancheNames[]");
184
185 /*
186 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
187 * stores the names of all dynamically-created tranches known to the current
188 * process. Any unused entries in the array will contain NULL.
189 */
190 static const char **LWLockTrancheNames = NULL;
191 static int LWLockTrancheNamesAllocated = 0;
192
193 /*
194 * This points to the main array of LWLocks in shared memory. Backends inherit
195 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
196 * where we have special measures to pass it down).
197 */
198 LWLockPadded *MainLWLockArray = NULL;
199
200 /*
201 * We use this structure to keep track of locked LWLocks for release
202 * during error recovery. Normally, only a few will be held at once, but
203 * occasionally the number can be much higher; for example, the pg_buffercache
204 * extension locks all buffer partitions simultaneously.
205 */
206 #define MAX_SIMUL_LWLOCKS 200
207
208 /* struct representing the LWLocks we're holding */
209 typedef struct LWLockHandle
210 {
211 LWLock *lock;
212 LWLockMode mode;
213 } LWLockHandle;
214
215 static int num_held_lwlocks = 0;
216 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
217
218 /* struct representing the LWLock tranche request for named tranche */
219 typedef struct NamedLWLockTrancheRequest
220 {
221 char tranche_name[NAMEDATALEN];
222 int num_lwlocks;
223 } NamedLWLockTrancheRequest;
224
225 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
226 static int NamedLWLockTrancheRequestsAllocated = 0;
227
228 /*
229 * NamedLWLockTrancheRequests is both the valid length of the request array,
230 * and the length of the shared-memory NamedLWLockTrancheArray later on.
231 * This variable and NamedLWLockTrancheArray are non-static so that
232 * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
233 */
234 int NamedLWLockTrancheRequests = 0;
235
236 /* points to data in shared memory: */
237 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
238
239 static bool lock_named_request_allowed = true;
240
241 static void InitializeLWLocks(void);
242 static inline void LWLockReportWaitStart(LWLock *lock);
243 static inline void LWLockReportWaitEnd(void);
244 static const char *GetLWTrancheName(uint16 trancheId);
245
246 #define T_NAME(lock) \
247 GetLWTrancheName((lock)->tranche)
248
249 #ifdef LWLOCK_STATS
250 typedef struct lwlock_stats_key
251 {
252 int tranche;
253 void *instance;
254 } lwlock_stats_key;
255
256 typedef struct lwlock_stats
257 {
258 lwlock_stats_key key;
259 int sh_acquire_count;
260 int ex_acquire_count;
261 int block_count;
262 int dequeue_self_count;
263 int spin_delay_count;
264 } lwlock_stats;
265
266 static HTAB *lwlock_stats_htab;
267 static lwlock_stats lwlock_stats_dummy;
268 #endif
269
270 #ifdef LOCK_DEBUG
271 bool Trace_lwlocks = false;
272
273 inline static void
PRINT_LWDEBUG(const char * where,LWLock * lock,LWLockMode mode)274 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
275 {
276 /* hide statement & context here, otherwise the log is just too verbose */
277 if (Trace_lwlocks)
278 {
279 uint32 state = pg_atomic_read_u32(&lock->state);
280
281 ereport(LOG,
282 (errhidestmt(true),
283 errhidecontext(true),
284 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
285 MyProcPid,
286 where, T_NAME(lock), lock,
287 (state & LW_VAL_EXCLUSIVE) != 0,
288 state & LW_SHARED_MASK,
289 (state & LW_FLAG_HAS_WAITERS) != 0,
290 pg_atomic_read_u32(&lock->nwaiters),
291 (state & LW_FLAG_RELEASE_OK) != 0)));
292 }
293 }
294
295 inline static void
LOG_LWDEBUG(const char * where,LWLock * lock,const char * msg)296 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
297 {
298 /* hide statement & context here, otherwise the log is just too verbose */
299 if (Trace_lwlocks)
300 {
301 ereport(LOG,
302 (errhidestmt(true),
303 errhidecontext(true),
304 errmsg_internal("%s(%s %p): %s", where,
305 T_NAME(lock), lock, msg)));
306 }
307 }
308
309 #else /* not LOCK_DEBUG */
310 #define PRINT_LWDEBUG(a,b,c) ((void)0)
311 #define LOG_LWDEBUG(a,b,c) ((void)0)
312 #endif /* LOCK_DEBUG */
313
314 #ifdef LWLOCK_STATS
315
316 static void init_lwlock_stats(void);
317 static void print_lwlock_stats(int code, Datum arg);
318 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
319
320 static void
init_lwlock_stats(void)321 init_lwlock_stats(void)
322 {
323 HASHCTL ctl;
324 static MemoryContext lwlock_stats_cxt = NULL;
325 static bool exit_registered = false;
326
327 if (lwlock_stats_cxt != NULL)
328 MemoryContextDelete(lwlock_stats_cxt);
329
330 /*
331 * The LWLock stats will be updated within a critical section, which
332 * requires allocating new hash entries. Allocations within a critical
333 * section are normally not allowed because running out of memory would
334 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
335 * turned on in production, so that's an acceptable risk. The hash entries
336 * are small, so the risk of running out of memory is minimal in practice.
337 */
338 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
339 "LWLock stats",
340 ALLOCSET_DEFAULT_SIZES);
341 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
342
343 ctl.keysize = sizeof(lwlock_stats_key);
344 ctl.entrysize = sizeof(lwlock_stats);
345 ctl.hcxt = lwlock_stats_cxt;
346 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
347 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
348 if (!exit_registered)
349 {
350 on_shmem_exit(print_lwlock_stats, 0);
351 exit_registered = true;
352 }
353 }
354
355 static void
print_lwlock_stats(int code,Datum arg)356 print_lwlock_stats(int code, Datum arg)
357 {
358 HASH_SEQ_STATUS scan;
359 lwlock_stats *lwstats;
360
361 hash_seq_init(&scan, lwlock_stats_htab);
362
363 /* Grab an LWLock to keep different backends from mixing reports */
364 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
365
366 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
367 {
368 fprintf(stderr,
369 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
370 MyProcPid, GetLWTrancheName(lwstats->key.tranche),
371 lwstats->key.instance, lwstats->sh_acquire_count,
372 lwstats->ex_acquire_count, lwstats->block_count,
373 lwstats->spin_delay_count, lwstats->dequeue_self_count);
374 }
375
376 LWLockRelease(&MainLWLockArray[0].lock);
377 }
378
379 static lwlock_stats *
get_lwlock_stats_entry(LWLock * lock)380 get_lwlock_stats_entry(LWLock *lock)
381 {
382 lwlock_stats_key key;
383 lwlock_stats *lwstats;
384 bool found;
385
386 /*
387 * During shared memory initialization, the hash table doesn't exist yet.
388 * Stats of that phase aren't very interesting, so just collect operations
389 * on all locks in a single dummy entry.
390 */
391 if (lwlock_stats_htab == NULL)
392 return &lwlock_stats_dummy;
393
394 /* Fetch or create the entry. */
395 MemSet(&key, 0, sizeof(key));
396 key.tranche = lock->tranche;
397 key.instance = lock;
398 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
399 if (!found)
400 {
401 lwstats->sh_acquire_count = 0;
402 lwstats->ex_acquire_count = 0;
403 lwstats->block_count = 0;
404 lwstats->dequeue_self_count = 0;
405 lwstats->spin_delay_count = 0;
406 }
407 return lwstats;
408 }
409 #endif /* LWLOCK_STATS */
410
411
412 /*
413 * Compute number of LWLocks required by named tranches. These will be
414 * allocated in the main array.
415 */
416 static int
NumLWLocksForNamedTranches(void)417 NumLWLocksForNamedTranches(void)
418 {
419 int numLocks = 0;
420 int i;
421
422 for (i = 0; i < NamedLWLockTrancheRequests; i++)
423 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
424
425 return numLocks;
426 }
427
428 /*
429 * Compute shmem space needed for LWLocks and named tranches.
430 */
431 Size
LWLockShmemSize(void)432 LWLockShmemSize(void)
433 {
434 Size size;
435 int i;
436 int numLocks = NUM_FIXED_LWLOCKS;
437
438 /* Calculate total number of locks needed in the main array. */
439 numLocks += NumLWLocksForNamedTranches();
440
441 /* Space for the LWLock array. */
442 size = mul_size(numLocks, sizeof(LWLockPadded));
443
444 /* Space for dynamic allocation counter, plus room for alignment. */
445 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
446
447 /* space for named tranches. */
448 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
449
450 /* space for name of each tranche. */
451 for (i = 0; i < NamedLWLockTrancheRequests; i++)
452 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
453
454 /* Disallow adding any more named tranches. */
455 lock_named_request_allowed = false;
456
457 return size;
458 }
459
460 /*
461 * Allocate shmem space for the main LWLock array and all tranches and
462 * initialize it. We also register extension LWLock tranches here.
463 */
464 void
CreateLWLocks(void)465 CreateLWLocks(void)
466 {
467 StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
468 "MAX_BACKENDS too big for lwlock.c");
469
470 StaticAssertStmt(sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
471 "Miscalculated LWLock padding");
472
473 if (!IsUnderPostmaster)
474 {
475 Size spaceLocks = LWLockShmemSize();
476 int *LWLockCounter;
477 char *ptr;
478
479 /* Allocate space */
480 ptr = (char *) ShmemAlloc(spaceLocks);
481
482 /* Leave room for dynamic allocation of tranches */
483 ptr += sizeof(int);
484
485 /* Ensure desired alignment of LWLock array */
486 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
487
488 MainLWLockArray = (LWLockPadded *) ptr;
489
490 /*
491 * Initialize the dynamic-allocation counter for tranches, which is
492 * stored just before the first LWLock.
493 */
494 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
495 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
496
497 /* Initialize all LWLocks */
498 InitializeLWLocks();
499 }
500
501 /* Register named extension LWLock tranches in the current process. */
502 for (int i = 0; i < NamedLWLockTrancheRequests; i++)
503 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
504 NamedLWLockTrancheArray[i].trancheName);
505 }
506
507 /*
508 * Initialize LWLocks that are fixed and those belonging to named tranches.
509 */
510 static void
InitializeLWLocks(void)511 InitializeLWLocks(void)
512 {
513 int numNamedLocks = NumLWLocksForNamedTranches();
514 int id;
515 int i;
516 int j;
517 LWLockPadded *lock;
518
519 /* Initialize all individual LWLocks in main array */
520 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
521 LWLockInitialize(&lock->lock, id);
522
523 /* Initialize buffer mapping LWLocks in main array */
524 lock = MainLWLockArray + BUFFER_MAPPING_LWLOCK_OFFSET;
525 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
526 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
527
528 /* Initialize lmgrs' LWLocks in main array */
529 lock = MainLWLockArray + LOCK_MANAGER_LWLOCK_OFFSET;
530 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
531 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
532
533 /* Initialize predicate lmgrs' LWLocks in main array */
534 lock = MainLWLockArray + PREDICATELOCK_MANAGER_LWLOCK_OFFSET;
535 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
536 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
537
538 /*
539 * Copy the info about any named tranches into shared memory (so that
540 * other processes can see it), and initialize the requested LWLocks.
541 */
542 if (NamedLWLockTrancheRequests > 0)
543 {
544 char *trancheNames;
545
546 NamedLWLockTrancheArray = (NamedLWLockTranche *)
547 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
548
549 trancheNames = (char *) NamedLWLockTrancheArray +
550 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
551 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
552
553 for (i = 0; i < NamedLWLockTrancheRequests; i++)
554 {
555 NamedLWLockTrancheRequest *request;
556 NamedLWLockTranche *tranche;
557 char *name;
558
559 request = &NamedLWLockTrancheRequestArray[i];
560 tranche = &NamedLWLockTrancheArray[i];
561
562 name = trancheNames;
563 trancheNames += strlen(request->tranche_name) + 1;
564 strcpy(name, request->tranche_name);
565 tranche->trancheId = LWLockNewTrancheId();
566 tranche->trancheName = name;
567
568 for (j = 0; j < request->num_lwlocks; j++, lock++)
569 LWLockInitialize(&lock->lock, tranche->trancheId);
570 }
571 }
572 }
573
574 /*
575 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
576 */
577 void
InitLWLockAccess(void)578 InitLWLockAccess(void)
579 {
580 #ifdef LWLOCK_STATS
581 init_lwlock_stats();
582 #endif
583 }
584
585 /*
586 * GetNamedLWLockTranche - returns the base address of LWLock from the
587 * specified tranche.
588 *
589 * Caller needs to retrieve the requested number of LWLocks starting from
590 * the base lock address returned by this API. This can be used for
591 * tranches that are requested by using RequestNamedLWLockTranche() API.
592 */
593 LWLockPadded *
GetNamedLWLockTranche(const char * tranche_name)594 GetNamedLWLockTranche(const char *tranche_name)
595 {
596 int lock_pos;
597 int i;
598
599 /*
600 * Obtain the position of base address of LWLock belonging to requested
601 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
602 * in MainLWLockArray after fixed locks.
603 */
604 lock_pos = NUM_FIXED_LWLOCKS;
605 for (i = 0; i < NamedLWLockTrancheRequests; i++)
606 {
607 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
608 tranche_name) == 0)
609 return &MainLWLockArray[lock_pos];
610
611 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
612 }
613
614 elog(ERROR, "requested tranche is not registered");
615
616 /* just to keep compiler quiet */
617 return NULL;
618 }
619
620 /*
621 * Allocate a new tranche ID.
622 */
623 int
LWLockNewTrancheId(void)624 LWLockNewTrancheId(void)
625 {
626 int result;
627 int *LWLockCounter;
628
629 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
630 SpinLockAcquire(ShmemLock);
631 result = (*LWLockCounter)++;
632 SpinLockRelease(ShmemLock);
633
634 return result;
635 }
636
637 /*
638 * Register a dynamic tranche name in the lookup table of the current process.
639 *
640 * This routine will save a pointer to the tranche name passed as an argument,
641 * so the name should be allocated in a backend-lifetime context
642 * (shared memory, TopMemoryContext, static constant, or similar).
643 *
644 * The tranche name will be user-visible as a wait event name, so try to
645 * use a name that fits the style for those.
646 */
647 void
LWLockRegisterTranche(int tranche_id,const char * tranche_name)648 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
649 {
650 /* This should only be called for user-defined tranches. */
651 if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
652 return;
653
654 /* Convert to array index. */
655 tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
656
657 /* If necessary, create or enlarge array. */
658 if (tranche_id >= LWLockTrancheNamesAllocated)
659 {
660 int newalloc;
661
662 newalloc = Max(LWLockTrancheNamesAllocated, 8);
663 while (newalloc <= tranche_id)
664 newalloc *= 2;
665
666 if (LWLockTrancheNames == NULL)
667 LWLockTrancheNames = (const char **)
668 MemoryContextAllocZero(TopMemoryContext,
669 newalloc * sizeof(char *));
670 else
671 {
672 LWLockTrancheNames = (const char **)
673 repalloc(LWLockTrancheNames, newalloc * sizeof(char *));
674 memset(LWLockTrancheNames + LWLockTrancheNamesAllocated,
675 0,
676 (newalloc - LWLockTrancheNamesAllocated) * sizeof(char *));
677 }
678 LWLockTrancheNamesAllocated = newalloc;
679 }
680
681 LWLockTrancheNames[tranche_id] = tranche_name;
682 }
683
684 /*
685 * RequestNamedLWLockTranche
686 * Request that extra LWLocks be allocated during postmaster
687 * startup.
688 *
689 * This is only useful for extensions if called from the _PG_init hook
690 * of a library that is loaded into the postmaster via
691 * shared_preload_libraries. Once shared memory has been allocated, calls
692 * will be ignored. (We could raise an error, but it seems better to make
693 * it a no-op, so that libraries containing such calls can be reloaded if
694 * needed.)
695 *
696 * The tranche name will be user-visible as a wait event name, so try to
697 * use a name that fits the style for those.
698 */
699 void
RequestNamedLWLockTranche(const char * tranche_name,int num_lwlocks)700 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
701 {
702 NamedLWLockTrancheRequest *request;
703
704 if (IsUnderPostmaster || !lock_named_request_allowed)
705 return; /* too late */
706
707 if (NamedLWLockTrancheRequestArray == NULL)
708 {
709 NamedLWLockTrancheRequestsAllocated = 16;
710 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
711 MemoryContextAlloc(TopMemoryContext,
712 NamedLWLockTrancheRequestsAllocated
713 * sizeof(NamedLWLockTrancheRequest));
714 }
715
716 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
717 {
718 int i = NamedLWLockTrancheRequestsAllocated;
719
720 while (i <= NamedLWLockTrancheRequests)
721 i *= 2;
722
723 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
724 repalloc(NamedLWLockTrancheRequestArray,
725 i * sizeof(NamedLWLockTrancheRequest));
726 NamedLWLockTrancheRequestsAllocated = i;
727 }
728
729 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
730 Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
731 strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
732 request->num_lwlocks = num_lwlocks;
733 NamedLWLockTrancheRequests++;
734 }
735
736 /*
737 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
738 */
739 void
LWLockInitialize(LWLock * lock,int tranche_id)740 LWLockInitialize(LWLock *lock, int tranche_id)
741 {
742 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
743 #ifdef LOCK_DEBUG
744 pg_atomic_init_u32(&lock->nwaiters, 0);
745 #endif
746 lock->tranche = tranche_id;
747 proclist_init(&lock->waiters);
748 }
749
750 /*
751 * Report start of wait event for light-weight locks.
752 *
753 * This function will be used by all the light-weight lock calls which
754 * needs to wait to acquire the lock. This function distinguishes wait
755 * event based on tranche and lock id.
756 */
757 static inline void
LWLockReportWaitStart(LWLock * lock)758 LWLockReportWaitStart(LWLock *lock)
759 {
760 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
761 }
762
763 /*
764 * Report end of wait event for light-weight locks.
765 */
766 static inline void
LWLockReportWaitEnd(void)767 LWLockReportWaitEnd(void)
768 {
769 pgstat_report_wait_end();
770 }
771
772 /*
773 * Return the name of an LWLock tranche.
774 */
775 static const char *
GetLWTrancheName(uint16 trancheId)776 GetLWTrancheName(uint16 trancheId)
777 {
778 /* Individual LWLock? */
779 if (trancheId < NUM_INDIVIDUAL_LWLOCKS)
780 return IndividualLWLockNames[trancheId];
781
782 /* Built-in tranche? */
783 if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
784 return BuiltinTrancheNames[trancheId - NUM_INDIVIDUAL_LWLOCKS];
785
786 /*
787 * It's an extension tranche, so look in LWLockTrancheNames[]. However,
788 * it's possible that the tranche has never been registered in the current
789 * process, in which case give up and return "extension".
790 */
791 trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
792
793 if (trancheId >= LWLockTrancheNamesAllocated ||
794 LWLockTrancheNames[trancheId] == NULL)
795 return "extension";
796
797 return LWLockTrancheNames[trancheId];
798 }
799
800 /*
801 * Return an identifier for an LWLock based on the wait class and event.
802 */
803 const char *
GetLWLockIdentifier(uint32 classId,uint16 eventId)804 GetLWLockIdentifier(uint32 classId, uint16 eventId)
805 {
806 Assert(classId == PG_WAIT_LWLOCK);
807 /* The event IDs are just tranche numbers. */
808 return GetLWTrancheName(eventId);
809 }
810
811 /*
812 * Internal function that tries to atomically acquire the lwlock in the passed
813 * in mode.
814 *
815 * This function will not block waiting for a lock to become free - that's the
816 * callers job.
817 *
818 * Returns true if the lock isn't free and we need to wait.
819 */
820 static bool
LWLockAttemptLock(LWLock * lock,LWLockMode mode)821 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
822 {
823 uint32 old_state;
824
825 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
826
827 /*
828 * Read once outside the loop, later iterations will get the newer value
829 * via compare & exchange.
830 */
831 old_state = pg_atomic_read_u32(&lock->state);
832
833 /* loop until we've determined whether we could acquire the lock or not */
834 while (true)
835 {
836 uint32 desired_state;
837 bool lock_free;
838
839 desired_state = old_state;
840
841 if (mode == LW_EXCLUSIVE)
842 {
843 lock_free = (old_state & LW_LOCK_MASK) == 0;
844 if (lock_free)
845 desired_state += LW_VAL_EXCLUSIVE;
846 }
847 else
848 {
849 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
850 if (lock_free)
851 desired_state += LW_VAL_SHARED;
852 }
853
854 /*
855 * Attempt to swap in the state we are expecting. If we didn't see
856 * lock to be free, that's just the old value. If we saw it as free,
857 * we'll attempt to mark it acquired. The reason that we always swap
858 * in the value is that this doubles as a memory barrier. We could try
859 * to be smarter and only swap in values if we saw the lock as free,
860 * but benchmark haven't shown it as beneficial so far.
861 *
862 * Retry if the value changed since we last looked at it.
863 */
864 if (pg_atomic_compare_exchange_u32(&lock->state,
865 &old_state, desired_state))
866 {
867 if (lock_free)
868 {
869 /* Great! Got the lock. */
870 #ifdef LOCK_DEBUG
871 if (mode == LW_EXCLUSIVE)
872 lock->owner = MyProc;
873 #endif
874 return false;
875 }
876 else
877 return true; /* somebody else has the lock */
878 }
879 }
880 pg_unreachable();
881 }
882
883 /*
884 * Lock the LWLock's wait list against concurrent activity.
885 *
886 * NB: even though the wait list is locked, non-conflicting lock operations
887 * may still happen concurrently.
888 *
889 * Time spent holding mutex should be short!
890 */
891 static void
LWLockWaitListLock(LWLock * lock)892 LWLockWaitListLock(LWLock *lock)
893 {
894 uint32 old_state;
895 #ifdef LWLOCK_STATS
896 lwlock_stats *lwstats;
897 uint32 delays = 0;
898
899 lwstats = get_lwlock_stats_entry(lock);
900 #endif
901
902 while (true)
903 {
904 /* always try once to acquire lock directly */
905 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
906 if (!(old_state & LW_FLAG_LOCKED))
907 break; /* got lock */
908
909 /* and then spin without atomic operations until lock is released */
910 {
911 SpinDelayStatus delayStatus;
912
913 init_local_spin_delay(&delayStatus);
914
915 while (old_state & LW_FLAG_LOCKED)
916 {
917 perform_spin_delay(&delayStatus);
918 old_state = pg_atomic_read_u32(&lock->state);
919 }
920 #ifdef LWLOCK_STATS
921 delays += delayStatus.delays;
922 #endif
923 finish_spin_delay(&delayStatus);
924 }
925
926 /*
927 * Retry. The lock might obviously already be re-acquired by the time
928 * we're attempting to get it again.
929 */
930 }
931
932 #ifdef LWLOCK_STATS
933 lwstats->spin_delay_count += delays;
934 #endif
935 }
936
937 /*
938 * Unlock the LWLock's wait list.
939 *
940 * Note that it can be more efficient to manipulate flags and release the
941 * locks in a single atomic operation.
942 */
943 static void
LWLockWaitListUnlock(LWLock * lock)944 LWLockWaitListUnlock(LWLock *lock)
945 {
946 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
947
948 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
949
950 Assert(old_state & LW_FLAG_LOCKED);
951 }
952
953 /*
954 * Wakeup all the lockers that currently have a chance to acquire the lock.
955 */
956 static void
LWLockWakeup(LWLock * lock)957 LWLockWakeup(LWLock *lock)
958 {
959 bool new_release_ok;
960 bool wokeup_somebody = false;
961 proclist_head wakeup;
962 proclist_mutable_iter iter;
963
964 proclist_init(&wakeup);
965
966 new_release_ok = true;
967
968 /* lock wait list while collecting backends to wake up */
969 LWLockWaitListLock(lock);
970
971 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
972 {
973 PGPROC *waiter = GetPGProcByNumber(iter.cur);
974
975 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
976 continue;
977
978 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
979 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
980
981 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
982 {
983 /*
984 * Prevent additional wakeups until retryer gets to run. Backends
985 * that are just waiting for the lock to become free don't retry
986 * automatically.
987 */
988 new_release_ok = false;
989
990 /*
991 * Don't wakeup (further) exclusive locks.
992 */
993 wokeup_somebody = true;
994 }
995
996 /*
997 * Once we've woken up an exclusive lock, there's no point in waking
998 * up anybody else.
999 */
1000 if (waiter->lwWaitMode == LW_EXCLUSIVE)
1001 break;
1002 }
1003
1004 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
1005
1006 /* unset required flags, and release lock, in one fell swoop */
1007 {
1008 uint32 old_state;
1009 uint32 desired_state;
1010
1011 old_state = pg_atomic_read_u32(&lock->state);
1012 while (true)
1013 {
1014 desired_state = old_state;
1015
1016 /* compute desired flags */
1017
1018 if (new_release_ok)
1019 desired_state |= LW_FLAG_RELEASE_OK;
1020 else
1021 desired_state &= ~LW_FLAG_RELEASE_OK;
1022
1023 if (proclist_is_empty(&wakeup))
1024 desired_state &= ~LW_FLAG_HAS_WAITERS;
1025
1026 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
1027
1028 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
1029 desired_state))
1030 break;
1031 }
1032 }
1033
1034 /* Awaken any waiters I removed from the queue. */
1035 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1036 {
1037 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1038
1039 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1040 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1041
1042 /*
1043 * Guarantee that lwWaiting being unset only becomes visible once the
1044 * unlink from the link has completed. Otherwise the target backend
1045 * could be woken up for other reason and enqueue for a new lock - if
1046 * that happens before the list unlink happens, the list would end up
1047 * being corrupted.
1048 *
1049 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1050 * another lock.
1051 */
1052 pg_write_barrier();
1053 waiter->lwWaiting = false;
1054 PGSemaphoreUnlock(waiter->sem);
1055 }
1056 }
1057
1058 /*
1059 * Add ourselves to the end of the queue.
1060 *
1061 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1062 */
1063 static void
LWLockQueueSelf(LWLock * lock,LWLockMode mode)1064 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1065 {
1066 /*
1067 * If we don't have a PGPROC structure, there's no way to wait. This
1068 * should never occur, since MyProc should only be null during shared
1069 * memory initialization.
1070 */
1071 if (MyProc == NULL)
1072 elog(PANIC, "cannot wait without a PGPROC structure");
1073
1074 if (MyProc->lwWaiting)
1075 elog(PANIC, "queueing for lock while waiting on another one");
1076
1077 LWLockWaitListLock(lock);
1078
1079 /* setting the flag is protected by the spinlock */
1080 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1081
1082 MyProc->lwWaiting = true;
1083 MyProc->lwWaitMode = mode;
1084
1085 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1086 if (mode == LW_WAIT_UNTIL_FREE)
1087 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1088 else
1089 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1090
1091 /* Can release the mutex now */
1092 LWLockWaitListUnlock(lock);
1093
1094 #ifdef LOCK_DEBUG
1095 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1096 #endif
1097
1098 }
1099
1100 /*
1101 * Remove ourselves from the waitlist.
1102 *
1103 * This is used if we queued ourselves because we thought we needed to sleep
1104 * but, after further checking, we discovered that we don't actually need to
1105 * do so.
1106 */
1107 static void
LWLockDequeueSelf(LWLock * lock)1108 LWLockDequeueSelf(LWLock *lock)
1109 {
1110 bool found = false;
1111 proclist_mutable_iter iter;
1112
1113 #ifdef LWLOCK_STATS
1114 lwlock_stats *lwstats;
1115
1116 lwstats = get_lwlock_stats_entry(lock);
1117
1118 lwstats->dequeue_self_count++;
1119 #endif
1120
1121 LWLockWaitListLock(lock);
1122
1123 /*
1124 * Can't just remove ourselves from the list, but we need to iterate over
1125 * all entries as somebody else could have dequeued us.
1126 */
1127 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1128 {
1129 if (iter.cur == MyProc->pgprocno)
1130 {
1131 found = true;
1132 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1133 break;
1134 }
1135 }
1136
1137 if (proclist_is_empty(&lock->waiters) &&
1138 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1139 {
1140 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1141 }
1142
1143 /* XXX: combine with fetch_and above? */
1144 LWLockWaitListUnlock(lock);
1145
1146 /* clear waiting state again, nice for debugging */
1147 if (found)
1148 MyProc->lwWaiting = false;
1149 else
1150 {
1151 int extraWaits = 0;
1152
1153 /*
1154 * Somebody else dequeued us and has or will wake us up. Deal with the
1155 * superfluous absorption of a wakeup.
1156 */
1157
1158 /*
1159 * Reset RELEASE_OK flag if somebody woke us before we removed
1160 * ourselves - they'll have set it to false.
1161 */
1162 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1163
1164 /*
1165 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1166 * get reset at some inconvenient point later. Most of the time this
1167 * will immediately return.
1168 */
1169 for (;;)
1170 {
1171 PGSemaphoreLock(MyProc->sem);
1172 if (!MyProc->lwWaiting)
1173 break;
1174 extraWaits++;
1175 }
1176
1177 /*
1178 * Fix the process wait semaphore's count for any absorbed wakeups.
1179 */
1180 while (extraWaits-- > 0)
1181 PGSemaphoreUnlock(MyProc->sem);
1182 }
1183
1184 #ifdef LOCK_DEBUG
1185 {
1186 /* not waiting anymore */
1187 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1188
1189 Assert(nwaiters < MAX_BACKENDS);
1190 }
1191 #endif
1192 }
1193
1194 /*
1195 * LWLockAcquire - acquire a lightweight lock in the specified mode
1196 *
1197 * If the lock is not available, sleep until it is. Returns true if the lock
1198 * was available immediately, false if we had to sleep.
1199 *
1200 * Side effect: cancel/die interrupts are held off until lock release.
1201 */
1202 bool
LWLockAcquire(LWLock * lock,LWLockMode mode)1203 LWLockAcquire(LWLock *lock, LWLockMode mode)
1204 {
1205 PGPROC *proc = MyProc;
1206 bool result = true;
1207 int extraWaits = 0;
1208 #ifdef LWLOCK_STATS
1209 lwlock_stats *lwstats;
1210
1211 lwstats = get_lwlock_stats_entry(lock);
1212 #endif
1213
1214 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1215
1216 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1217
1218 #ifdef LWLOCK_STATS
1219 /* Count lock acquisition attempts */
1220 if (mode == LW_EXCLUSIVE)
1221 lwstats->ex_acquire_count++;
1222 else
1223 lwstats->sh_acquire_count++;
1224 #endif /* LWLOCK_STATS */
1225
1226 /*
1227 * We can't wait if we haven't got a PGPROC. This should only occur
1228 * during bootstrap or shared memory initialization. Put an Assert here
1229 * to catch unsafe coding practices.
1230 */
1231 Assert(!(proc == NULL && IsUnderPostmaster));
1232
1233 /* Ensure we will have room to remember the lock */
1234 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1235 elog(ERROR, "too many LWLocks taken");
1236
1237 /*
1238 * Lock out cancel/die interrupts until we exit the code section protected
1239 * by the LWLock. This ensures that interrupts will not interfere with
1240 * manipulations of data structures in shared memory.
1241 */
1242 HOLD_INTERRUPTS();
1243
1244 /*
1245 * Loop here to try to acquire lock after each time we are signaled by
1246 * LWLockRelease.
1247 *
1248 * NOTE: it might seem better to have LWLockRelease actually grant us the
1249 * lock, rather than retrying and possibly having to go back to sleep. But
1250 * in practice that is no good because it means a process swap for every
1251 * lock acquisition when two or more processes are contending for the same
1252 * lock. Since LWLocks are normally used to protect not-very-long
1253 * sections of computation, a process needs to be able to acquire and
1254 * release the same lock many times during a single CPU time slice, even
1255 * in the presence of contention. The efficiency of being able to do that
1256 * outweighs the inefficiency of sometimes wasting a process dispatch
1257 * cycle because the lock is not free when a released waiter finally gets
1258 * to run. See pgsql-hackers archives for 29-Dec-01.
1259 */
1260 for (;;)
1261 {
1262 bool mustwait;
1263
1264 /*
1265 * Try to grab the lock the first time, we're not in the waitqueue
1266 * yet/anymore.
1267 */
1268 mustwait = LWLockAttemptLock(lock, mode);
1269
1270 if (!mustwait)
1271 {
1272 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1273 break; /* got the lock */
1274 }
1275
1276 /*
1277 * Ok, at this point we couldn't grab the lock on the first try. We
1278 * cannot simply queue ourselves to the end of the list and wait to be
1279 * woken up because by now the lock could long have been released.
1280 * Instead add us to the queue and try to grab the lock again. If we
1281 * succeed we need to revert the queuing and be happy, otherwise we
1282 * recheck the lock. If we still couldn't grab it, we know that the
1283 * other locker will see our queue entries when releasing since they
1284 * existed before we checked for the lock.
1285 */
1286
1287 /* add to the queue */
1288 LWLockQueueSelf(lock, mode);
1289
1290 /* we're now guaranteed to be woken up if necessary */
1291 mustwait = LWLockAttemptLock(lock, mode);
1292
1293 /* ok, grabbed the lock the second time round, need to undo queueing */
1294 if (!mustwait)
1295 {
1296 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1297
1298 LWLockDequeueSelf(lock);
1299 break;
1300 }
1301
1302 /*
1303 * Wait until awakened.
1304 *
1305 * It is possible that we get awakened for a reason other than being
1306 * signaled by LWLockRelease. If so, loop back and wait again. Once
1307 * we've gotten the LWLock, re-increment the sema by the number of
1308 * additional signals received.
1309 */
1310 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1311
1312 #ifdef LWLOCK_STATS
1313 lwstats->block_count++;
1314 #endif
1315
1316 LWLockReportWaitStart(lock);
1317 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1318 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1319
1320 for (;;)
1321 {
1322 PGSemaphoreLock(proc->sem);
1323 if (!proc->lwWaiting)
1324 break;
1325 extraWaits++;
1326 }
1327
1328 /* Retrying, allow LWLockRelease to release waiters again. */
1329 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1330
1331 #ifdef LOCK_DEBUG
1332 {
1333 /* not waiting anymore */
1334 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1335
1336 Assert(nwaiters < MAX_BACKENDS);
1337 }
1338 #endif
1339
1340 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1341 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1342 LWLockReportWaitEnd();
1343
1344 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1345
1346 /* Now loop back and try to acquire lock again. */
1347 result = false;
1348 }
1349
1350 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1351 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1352
1353 /* Add lock to list of locks held by this backend */
1354 held_lwlocks[num_held_lwlocks].lock = lock;
1355 held_lwlocks[num_held_lwlocks++].mode = mode;
1356
1357 /*
1358 * Fix the process wait semaphore's count for any absorbed wakeups.
1359 */
1360 while (extraWaits-- > 0)
1361 PGSemaphoreUnlock(proc->sem);
1362
1363 return result;
1364 }
1365
1366 /*
1367 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1368 *
1369 * If the lock is not available, return false with no side-effects.
1370 *
1371 * If successful, cancel/die interrupts are held off until lock release.
1372 */
1373 bool
LWLockConditionalAcquire(LWLock * lock,LWLockMode mode)1374 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1375 {
1376 bool mustwait;
1377
1378 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1379
1380 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1381
1382 /* Ensure we will have room to remember the lock */
1383 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1384 elog(ERROR, "too many LWLocks taken");
1385
1386 /*
1387 * Lock out cancel/die interrupts until we exit the code section protected
1388 * by the LWLock. This ensures that interrupts will not interfere with
1389 * manipulations of data structures in shared memory.
1390 */
1391 HOLD_INTERRUPTS();
1392
1393 /* Check for the lock */
1394 mustwait = LWLockAttemptLock(lock, mode);
1395
1396 if (mustwait)
1397 {
1398 /* Failed to get lock, so release interrupt holdoff */
1399 RESUME_INTERRUPTS();
1400
1401 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1402 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1403 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1404 }
1405 else
1406 {
1407 /* Add lock to list of locks held by this backend */
1408 held_lwlocks[num_held_lwlocks].lock = lock;
1409 held_lwlocks[num_held_lwlocks++].mode = mode;
1410 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1411 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1412 }
1413 return !mustwait;
1414 }
1415
1416 /*
1417 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1418 *
1419 * The semantics of this function are a bit funky. If the lock is currently
1420 * free, it is acquired in the given mode, and the function returns true. If
1421 * the lock isn't immediately free, the function waits until it is released
1422 * and returns false, but does not acquire the lock.
1423 *
1424 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1425 * holding WALWriteLock, it can flush the commit records of many other
1426 * backends as a side-effect. Those other backends need to wait until the
1427 * flush finishes, but don't need to acquire the lock anymore. They can just
1428 * wake up, observe that their records have already been flushed, and return.
1429 */
1430 bool
LWLockAcquireOrWait(LWLock * lock,LWLockMode mode)1431 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1432 {
1433 PGPROC *proc = MyProc;
1434 bool mustwait;
1435 int extraWaits = 0;
1436 #ifdef LWLOCK_STATS
1437 lwlock_stats *lwstats;
1438
1439 lwstats = get_lwlock_stats_entry(lock);
1440 #endif
1441
1442 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1443
1444 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1445
1446 /* Ensure we will have room to remember the lock */
1447 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1448 elog(ERROR, "too many LWLocks taken");
1449
1450 /*
1451 * Lock out cancel/die interrupts until we exit the code section protected
1452 * by the LWLock. This ensures that interrupts will not interfere with
1453 * manipulations of data structures in shared memory.
1454 */
1455 HOLD_INTERRUPTS();
1456
1457 /*
1458 * NB: We're using nearly the same twice-in-a-row lock acquisition
1459 * protocol as LWLockAcquire(). Check its comments for details.
1460 */
1461 mustwait = LWLockAttemptLock(lock, mode);
1462
1463 if (mustwait)
1464 {
1465 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1466
1467 mustwait = LWLockAttemptLock(lock, mode);
1468
1469 if (mustwait)
1470 {
1471 /*
1472 * Wait until awakened. Like in LWLockAcquire, be prepared for
1473 * bogus wakeups.
1474 */
1475 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1476
1477 #ifdef LWLOCK_STATS
1478 lwstats->block_count++;
1479 #endif
1480
1481 LWLockReportWaitStart(lock);
1482 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1483 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1484
1485 for (;;)
1486 {
1487 PGSemaphoreLock(proc->sem);
1488 if (!proc->lwWaiting)
1489 break;
1490 extraWaits++;
1491 }
1492
1493 #ifdef LOCK_DEBUG
1494 {
1495 /* not waiting anymore */
1496 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1497
1498 Assert(nwaiters < MAX_BACKENDS);
1499 }
1500 #endif
1501 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1502 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1503 LWLockReportWaitEnd();
1504
1505 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1506 }
1507 else
1508 {
1509 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1510
1511 /*
1512 * Got lock in the second attempt, undo queueing. We need to treat
1513 * this as having successfully acquired the lock, otherwise we'd
1514 * not necessarily wake up people we've prevented from acquiring
1515 * the lock.
1516 */
1517 LWLockDequeueSelf(lock);
1518 }
1519 }
1520
1521 /*
1522 * Fix the process wait semaphore's count for any absorbed wakeups.
1523 */
1524 while (extraWaits-- > 0)
1525 PGSemaphoreUnlock(proc->sem);
1526
1527 if (mustwait)
1528 {
1529 /* Failed to get lock, so release interrupt holdoff */
1530 RESUME_INTERRUPTS();
1531 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1532 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1533 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1534 }
1535 else
1536 {
1537 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1538 /* Add lock to list of locks held by this backend */
1539 held_lwlocks[num_held_lwlocks].lock = lock;
1540 held_lwlocks[num_held_lwlocks++].mode = mode;
1541 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1542 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1543 }
1544
1545 return !mustwait;
1546 }
1547
1548 /*
1549 * Does the lwlock in its current state need to wait for the variable value to
1550 * change?
1551 *
1552 * If we don't need to wait, and it's because the value of the variable has
1553 * changed, store the current value in newval.
1554 *
1555 * *result is set to true if the lock was free, and false otherwise.
1556 */
1557 static bool
LWLockConflictsWithVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval,bool * result)1558 LWLockConflictsWithVar(LWLock *lock,
1559 uint64 *valptr, uint64 oldval, uint64 *newval,
1560 bool *result)
1561 {
1562 bool mustwait;
1563 uint64 value;
1564
1565 /*
1566 * Test first to see if it the slot is free right now.
1567 *
1568 * XXX: the caller uses a spinlock before this, so we don't need a memory
1569 * barrier here as far as the current usage is concerned. But that might
1570 * not be safe in general.
1571 */
1572 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1573
1574 if (!mustwait)
1575 {
1576 *result = true;
1577 return false;
1578 }
1579
1580 *result = false;
1581
1582 /*
1583 * Read value using the lwlock's wait list lock, as we can't generally
1584 * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1585 * do atomic 64 bit reads/writes the spinlock should be optimized away.
1586 */
1587 LWLockWaitListLock(lock);
1588 value = *valptr;
1589 LWLockWaitListUnlock(lock);
1590
1591 if (value != oldval)
1592 {
1593 mustwait = false;
1594 *newval = value;
1595 }
1596 else
1597 {
1598 mustwait = true;
1599 }
1600
1601 return mustwait;
1602 }
1603
1604 /*
1605 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1606 *
1607 * If the lock is held and *valptr equals oldval, waits until the lock is
1608 * either freed, or the lock holder updates *valptr by calling
1609 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1610 * waiting), returns true. If the lock is still held, but *valptr no longer
1611 * matches oldval, returns false and sets *newval to the current value in
1612 * *valptr.
1613 *
1614 * Note: this function ignores shared lock holders; if the lock is held
1615 * in shared mode, returns 'true'.
1616 */
1617 bool
LWLockWaitForVar(LWLock * lock,uint64 * valptr,uint64 oldval,uint64 * newval)1618 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1619 {
1620 PGPROC *proc = MyProc;
1621 int extraWaits = 0;
1622 bool result = false;
1623 #ifdef LWLOCK_STATS
1624 lwlock_stats *lwstats;
1625
1626 lwstats = get_lwlock_stats_entry(lock);
1627 #endif
1628
1629 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1630
1631 /*
1632 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1633 * cleanup mechanism to remove us from the wait queue if we got
1634 * interrupted.
1635 */
1636 HOLD_INTERRUPTS();
1637
1638 /*
1639 * Loop here to check the lock's status after each time we are signaled.
1640 */
1641 for (;;)
1642 {
1643 bool mustwait;
1644
1645 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1646 &result);
1647
1648 if (!mustwait)
1649 break; /* the lock was free or value didn't match */
1650
1651 /*
1652 * Add myself to wait queue. Note that this is racy, somebody else
1653 * could wakeup before we're finished queuing. NB: We're using nearly
1654 * the same twice-in-a-row lock acquisition protocol as
1655 * LWLockAcquire(). Check its comments for details. The only
1656 * difference is that we also have to check the variable's values when
1657 * checking the state of the lock.
1658 */
1659 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1660
1661 /*
1662 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1663 * lock is released.
1664 */
1665 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1666
1667 /*
1668 * We're now guaranteed to be woken up if necessary. Recheck the lock
1669 * and variables state.
1670 */
1671 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1672 &result);
1673
1674 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1675 if (!mustwait)
1676 {
1677 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1678
1679 LWLockDequeueSelf(lock);
1680 break;
1681 }
1682
1683 /*
1684 * Wait until awakened.
1685 *
1686 * It is possible that we get awakened for a reason other than being
1687 * signaled by LWLockRelease. If so, loop back and wait again. Once
1688 * we've gotten the LWLock, re-increment the sema by the number of
1689 * additional signals received.
1690 */
1691 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1692
1693 #ifdef LWLOCK_STATS
1694 lwstats->block_count++;
1695 #endif
1696
1697 LWLockReportWaitStart(lock);
1698 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1699 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1700
1701 for (;;)
1702 {
1703 PGSemaphoreLock(proc->sem);
1704 if (!proc->lwWaiting)
1705 break;
1706 extraWaits++;
1707 }
1708
1709 #ifdef LOCK_DEBUG
1710 {
1711 /* not waiting anymore */
1712 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1713
1714 Assert(nwaiters < MAX_BACKENDS);
1715 }
1716 #endif
1717
1718 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1719 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1720 LWLockReportWaitEnd();
1721
1722 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1723
1724 /* Now loop back and check the status of the lock again. */
1725 }
1726
1727 /*
1728 * Fix the process wait semaphore's count for any absorbed wakeups.
1729 */
1730 while (extraWaits-- > 0)
1731 PGSemaphoreUnlock(proc->sem);
1732
1733 /*
1734 * Now okay to allow cancel/die interrupts.
1735 */
1736 RESUME_INTERRUPTS();
1737
1738 return result;
1739 }
1740
1741
1742 /*
1743 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1744 *
1745 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1746 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1747 * atomically so that any process calling LWLockWaitForVar() on the same lock
1748 * is guaranteed to see the new value, and act accordingly.
1749 *
1750 * The caller must be holding the lock in exclusive mode.
1751 */
1752 void
LWLockUpdateVar(LWLock * lock,uint64 * valptr,uint64 val)1753 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1754 {
1755 proclist_head wakeup;
1756 proclist_mutable_iter iter;
1757
1758 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1759
1760 proclist_init(&wakeup);
1761
1762 LWLockWaitListLock(lock);
1763
1764 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1765
1766 /* Update the lock's value */
1767 *valptr = val;
1768
1769 /*
1770 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1771 * up. They are always in the front of the queue.
1772 */
1773 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1774 {
1775 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1776
1777 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1778 break;
1779
1780 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1781 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1782 }
1783
1784 /* We are done updating shared state of the lock itself. */
1785 LWLockWaitListUnlock(lock);
1786
1787 /*
1788 * Awaken any waiters I removed from the queue.
1789 */
1790 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1791 {
1792 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1793
1794 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1795 /* check comment in LWLockWakeup() about this barrier */
1796 pg_write_barrier();
1797 waiter->lwWaiting = false;
1798 PGSemaphoreUnlock(waiter->sem);
1799 }
1800 }
1801
1802
1803 /*
1804 * LWLockRelease - release a previously acquired lock
1805 */
1806 void
LWLockRelease(LWLock * lock)1807 LWLockRelease(LWLock *lock)
1808 {
1809 LWLockMode mode;
1810 uint32 oldstate;
1811 bool check_waiters;
1812 int i;
1813
1814 /*
1815 * Remove lock from list of locks held. Usually, but not always, it will
1816 * be the latest-acquired lock; so search array backwards.
1817 */
1818 for (i = num_held_lwlocks; --i >= 0;)
1819 if (lock == held_lwlocks[i].lock)
1820 break;
1821
1822 if (i < 0)
1823 elog(ERROR, "lock %s is not held", T_NAME(lock));
1824
1825 mode = held_lwlocks[i].mode;
1826
1827 num_held_lwlocks--;
1828 for (; i < num_held_lwlocks; i++)
1829 held_lwlocks[i] = held_lwlocks[i + 1];
1830
1831 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1832
1833 /*
1834 * Release my hold on lock, after that it can immediately be acquired by
1835 * others, even if we still have to wakeup other waiters.
1836 */
1837 if (mode == LW_EXCLUSIVE)
1838 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1839 else
1840 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1841
1842 /* nobody else can have that kind of lock */
1843 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1844
1845 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1846 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1847
1848 /*
1849 * We're still waiting for backends to get scheduled, don't wake them up
1850 * again.
1851 */
1852 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1853 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1854 (oldstate & LW_LOCK_MASK) == 0)
1855 check_waiters = true;
1856 else
1857 check_waiters = false;
1858
1859 /*
1860 * As waking up waiters requires the spinlock to be acquired, only do so
1861 * if necessary.
1862 */
1863 if (check_waiters)
1864 {
1865 /* XXX: remove before commit? */
1866 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1867 LWLockWakeup(lock);
1868 }
1869
1870 /*
1871 * Now okay to allow cancel/die interrupts.
1872 */
1873 RESUME_INTERRUPTS();
1874 }
1875
1876 /*
1877 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1878 */
1879 void
LWLockReleaseClearVar(LWLock * lock,uint64 * valptr,uint64 val)1880 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1881 {
1882 LWLockWaitListLock(lock);
1883
1884 /*
1885 * Set the variable's value before releasing the lock, that prevents race
1886 * a race condition wherein a new locker acquires the lock, but hasn't yet
1887 * set the variables value.
1888 */
1889 *valptr = val;
1890 LWLockWaitListUnlock(lock);
1891
1892 LWLockRelease(lock);
1893 }
1894
1895
1896 /*
1897 * LWLockReleaseAll - release all currently-held locks
1898 *
1899 * Used to clean up after ereport(ERROR). An important difference between this
1900 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1901 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1902 * has been set to an appropriate level earlier in error recovery. We could
1903 * decrement it below zero if we allow it to drop for each released lock!
1904 */
1905 void
LWLockReleaseAll(void)1906 LWLockReleaseAll(void)
1907 {
1908 while (num_held_lwlocks > 0)
1909 {
1910 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1911
1912 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1913 }
1914 }
1915
1916
1917 /*
1918 * LWLockHeldByMe - test whether my process holds a lock in any mode
1919 *
1920 * This is meant as debug support only.
1921 */
1922 bool
LWLockHeldByMe(LWLock * l)1923 LWLockHeldByMe(LWLock *l)
1924 {
1925 int i;
1926
1927 for (i = 0; i < num_held_lwlocks; i++)
1928 {
1929 if (held_lwlocks[i].lock == l)
1930 return true;
1931 }
1932 return false;
1933 }
1934
1935 /*
1936 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1937 *
1938 * This is meant as debug support only.
1939 */
1940 bool
LWLockHeldByMeInMode(LWLock * l,LWLockMode mode)1941 LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1942 {
1943 int i;
1944
1945 for (i = 0; i < num_held_lwlocks; i++)
1946 {
1947 if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1948 return true;
1949 }
1950 return false;
1951 }
1952