1 /*-------------------------------------------------------------------------
2 *
3 * sharedtuplestore.c
4 * Simple mechanism for sharing tuples between backends.
5 *
6 * This module contains a shared temporary tuple storage mechanism providing
7 * a parallel-aware subset of the features of tuplestore.c. Multiple backends
8 * can write to a SharedTuplestore, and then multiple backends can later scan
9 * the stored tuples. Currently, the only scan type supported is a parallel
10 * scan where each backend reads an arbitrary subset of the tuples that were
11 * written.
12 *
13 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
14 * Portions Copyright (c) 1994, Regents of the University of California
15 *
16 * IDENTIFICATION
17 * src/backend/utils/sort/sharedtuplestore.c
18 *
19 *-------------------------------------------------------------------------
20 */
21
22 #include "postgres.h"
23
24 #include "access/htup.h"
25 #include "access/htup_details.h"
26 #include "miscadmin.h"
27 #include "storage/buffile.h"
28 #include "storage/lwlock.h"
29 #include "storage/sharedfileset.h"
30 #include "utils/sharedtuplestore.h"
31
32 #include <limits.h>
33
34 /*
35 * The size of chunks, in pages. This is somewhat arbitrarily set to match
36 * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
37 * at approximately the same rate as it allocates new chunks of memory to
38 * insert them into.
39 */
40 #define STS_CHUNK_PAGES 4
41 #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
42 #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
43
44 /* Chunk written to disk. */
45 typedef struct SharedTuplestoreChunk
46 {
47 int ntuples; /* Number of tuples in this chunk. */
48 int overflow; /* If overflow, how many including this one? */
49 char data[FLEXIBLE_ARRAY_MEMBER];
50 } SharedTuplestoreChunk;
51
52 /* Per-participant shared state. */
53 typedef struct SharedTuplestoreParticipant
54 {
55 LWLock lock;
56 BlockNumber read_page; /* Page number for next read. */
57 BlockNumber npages; /* Number of pages written. */
58 bool writing; /* Used only for assertions. */
59 } SharedTuplestoreParticipant;
60
61 /* The control object that lives in shared memory. */
62 struct SharedTuplestore
63 {
64 int nparticipants; /* Number of participants that can write. */
65 int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
66 size_t meta_data_size; /* Size of per-tuple header. */
67 char name[NAMEDATALEN]; /* A name for this tuplestore. */
68
69 /* Followed by per-participant shared state. */
70 SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
71 };
72
73 /* Per-participant state that lives in backend-local memory. */
74 struct SharedTuplestoreAccessor
75 {
76 int participant; /* My participant number. */
77 SharedTuplestore *sts; /* The shared state. */
78 SharedFileSet *fileset; /* The SharedFileSet holding files. */
79 MemoryContext context; /* Memory context for buffers. */
80
81 /* State for reading. */
82 int read_participant; /* The current participant to read from. */
83 BufFile *read_file; /* The current file to read from. */
84 int read_ntuples_available; /* The number of tuples in chunk. */
85 int read_ntuples; /* How many tuples have we read from chunk? */
86 size_t read_bytes; /* How many bytes have we read from chunk? */
87 char *read_buffer; /* A buffer for loading tuples. */
88 size_t read_buffer_size;
89 BlockNumber read_next_page; /* Lowest block we'll consider reading. */
90
91 /* State for writing. */
92 SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
93 BufFile *write_file; /* The current file to write to. */
94 BlockNumber write_page; /* The next page to write to. */
95 char *write_pointer; /* Current write pointer within chunk. */
96 char *write_end; /* One past the end of the current chunk. */
97 };
98
99 static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
100 int participant);
101
102 /*
103 * Return the amount of shared memory required to hold SharedTuplestore for a
104 * given number of participants.
105 */
106 size_t
sts_estimate(int participants)107 sts_estimate(int participants)
108 {
109 return offsetof(SharedTuplestore, participants) +
110 sizeof(SharedTuplestoreParticipant) * participants;
111 }
112
113 /*
114 * Initialize a SharedTuplestore in existing shared memory. There must be
115 * space for sts_estimate(participants) bytes. If flags includes the value
116 * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
117 * eagerly (but this isn't yet implemented).
118 *
119 * Tuples that are stored may optionally carry a piece of fixed sized
120 * meta-data which will be retrieved along with the tuple. This is useful for
121 * the hash values used in multi-batch hash joins, but could have other
122 * applications.
123 *
124 * The caller must supply a SharedFileSet, which is essentially a directory
125 * that will be cleaned up automatically, and a name which must be unique
126 * across all SharedTuplestores created in the same SharedFileSet.
127 */
128 SharedTuplestoreAccessor *
sts_initialize(SharedTuplestore * sts,int participants,int my_participant_number,size_t meta_data_size,int flags,SharedFileSet * fileset,const char * name)129 sts_initialize(SharedTuplestore *sts, int participants,
130 int my_participant_number,
131 size_t meta_data_size,
132 int flags,
133 SharedFileSet *fileset,
134 const char *name)
135 {
136 SharedTuplestoreAccessor *accessor;
137 int i;
138
139 Assert(my_participant_number < participants);
140
141 sts->nparticipants = participants;
142 sts->meta_data_size = meta_data_size;
143 sts->flags = flags;
144
145 if (strlen(name) > sizeof(sts->name) - 1)
146 elog(ERROR, "SharedTuplestore name too long");
147 strcpy(sts->name, name);
148
149 /*
150 * Limit meta-data so it + tuple size always fits into a single chunk.
151 * sts_puttuple() and sts_read_tuple() could be made to support scenarios
152 * where that's not the case, but it's not currently required. If so,
153 * meta-data size probably should be made variable, too.
154 */
155 if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
156 elog(ERROR, "meta-data too long");
157
158 for (i = 0; i < participants; ++i)
159 {
160 LWLockInitialize(&sts->participants[i].lock,
161 LWTRANCHE_SHARED_TUPLESTORE);
162 sts->participants[i].read_page = 0;
163 sts->participants[i].writing = false;
164 }
165
166 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
167 accessor->participant = my_participant_number;
168 accessor->sts = sts;
169 accessor->fileset = fileset;
170 accessor->context = CurrentMemoryContext;
171
172 return accessor;
173 }
174
175 /*
176 * Attach to a SharedTuplestore that has been initialized by another backend,
177 * so that this backend can read and write tuples.
178 */
179 SharedTuplestoreAccessor *
sts_attach(SharedTuplestore * sts,int my_participant_number,SharedFileSet * fileset)180 sts_attach(SharedTuplestore *sts,
181 int my_participant_number,
182 SharedFileSet *fileset)
183 {
184 SharedTuplestoreAccessor *accessor;
185
186 Assert(my_participant_number < sts->nparticipants);
187
188 accessor = palloc0(sizeof(SharedTuplestoreAccessor));
189 accessor->participant = my_participant_number;
190 accessor->sts = sts;
191 accessor->fileset = fileset;
192 accessor->context = CurrentMemoryContext;
193
194 return accessor;
195 }
196
197 static void
sts_flush_chunk(SharedTuplestoreAccessor * accessor)198 sts_flush_chunk(SharedTuplestoreAccessor *accessor)
199 {
200 size_t size;
201
202 size = STS_CHUNK_PAGES * BLCKSZ;
203 BufFileWrite(accessor->write_file, accessor->write_chunk, size);
204 memset(accessor->write_chunk, 0, size);
205 accessor->write_pointer = &accessor->write_chunk->data[0];
206 accessor->sts->participants[accessor->participant].npages +=
207 STS_CHUNK_PAGES;
208 }
209
210 /*
211 * Finish writing tuples. This must be called by all backends that have
212 * written data before any backend begins reading it.
213 */
214 void
sts_end_write(SharedTuplestoreAccessor * accessor)215 sts_end_write(SharedTuplestoreAccessor *accessor)
216 {
217 if (accessor->write_file != NULL)
218 {
219 sts_flush_chunk(accessor);
220 BufFileClose(accessor->write_file);
221 pfree(accessor->write_chunk);
222 accessor->write_chunk = NULL;
223 accessor->write_file = NULL;
224 accessor->sts->participants[accessor->participant].writing = false;
225 }
226 }
227
228 /*
229 * Prepare to rescan. Only one participant must call this. After it returns,
230 * all participants may call sts_begin_parallel_scan() and then loop over
231 * sts_parallel_scan_next(). This function must not be called concurrently
232 * with a scan, and synchronization to avoid that is the caller's
233 * responsibility.
234 */
235 void
sts_reinitialize(SharedTuplestoreAccessor * accessor)236 sts_reinitialize(SharedTuplestoreAccessor *accessor)
237 {
238 int i;
239
240 /*
241 * Reset the shared read head for all participants' files. Also set the
242 * initial chunk size to the minimum (any increases from that size will be
243 * recorded in chunk_expansion_log).
244 */
245 for (i = 0; i < accessor->sts->nparticipants; ++i)
246 {
247 accessor->sts->participants[i].read_page = 0;
248 }
249 }
250
251 /*
252 * Begin scanning the contents in parallel.
253 */
254 void
sts_begin_parallel_scan(SharedTuplestoreAccessor * accessor)255 sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
256 {
257 int i PG_USED_FOR_ASSERTS_ONLY;
258
259 /* End any existing scan that was in progress. */
260 sts_end_parallel_scan(accessor);
261
262 /*
263 * Any backend that might have written into this shared tuplestore must
264 * have called sts_end_write(), so that all buffers are flushed and the
265 * files have stopped growing.
266 */
267 for (i = 0; i < accessor->sts->nparticipants; ++i)
268 Assert(!accessor->sts->participants[i].writing);
269
270 /*
271 * We will start out reading the file that THIS backend wrote. There may
272 * be some caching locality advantage to that.
273 */
274 accessor->read_participant = accessor->participant;
275 accessor->read_file = NULL;
276 accessor->read_next_page = 0;
277 }
278
279 /*
280 * Finish a parallel scan, freeing associated backend-local resources.
281 */
282 void
sts_end_parallel_scan(SharedTuplestoreAccessor * accessor)283 sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
284 {
285 /*
286 * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
287 * we'd probably need a reference count of current parallel scanners so we
288 * could safely do it only when the reference count reaches zero.
289 */
290 if (accessor->read_file != NULL)
291 {
292 BufFileClose(accessor->read_file);
293 accessor->read_file = NULL;
294 }
295 }
296
297 /*
298 * Write a tuple. If a meta-data size was provided to sts_initialize, then a
299 * pointer to meta data of that size must be provided.
300 */
301 void
sts_puttuple(SharedTuplestoreAccessor * accessor,void * meta_data,MinimalTuple tuple)302 sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
303 MinimalTuple tuple)
304 {
305 size_t size;
306
307 /* Do we have our own file yet? */
308 if (accessor->write_file == NULL)
309 {
310 SharedTuplestoreParticipant *participant;
311 char name[MAXPGPATH];
312
313 /* Create one. Only this backend will write into it. */
314 sts_filename(name, accessor, accessor->participant);
315 accessor->write_file = BufFileCreateShared(accessor->fileset, name);
316
317 /* Set up the shared state for this backend's file. */
318 participant = &accessor->sts->participants[accessor->participant];
319 participant->writing = true; /* for assertions only */
320 }
321
322 /* Do we have space? */
323 size = accessor->sts->meta_data_size + tuple->t_len;
324 if (accessor->write_pointer + size >= accessor->write_end)
325 {
326 if (accessor->write_chunk == NULL)
327 {
328 /* First time through. Allocate chunk. */
329 accessor->write_chunk = (SharedTuplestoreChunk *)
330 MemoryContextAllocZero(accessor->context,
331 STS_CHUNK_PAGES * BLCKSZ);
332 accessor->write_chunk->ntuples = 0;
333 accessor->write_pointer = &accessor->write_chunk->data[0];
334 accessor->write_end = (char *)
335 accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
336 }
337 else
338 {
339 /* See if flushing helps. */
340 sts_flush_chunk(accessor);
341 }
342
343 /* It may still not be enough in the case of a gigantic tuple. */
344 if (accessor->write_pointer + size >= accessor->write_end)
345 {
346 size_t written;
347
348 /*
349 * We'll write the beginning of the oversized tuple, and then
350 * write the rest in some number of 'overflow' chunks.
351 *
352 * sts_initialize() verifies that the size of the tuple +
353 * meta-data always fits into a chunk. Because the chunk has been
354 * flushed above, we can be sure to have all of a chunk's usable
355 * space available.
356 */
357 Assert(accessor->write_pointer + accessor->sts->meta_data_size +
358 sizeof(uint32) < accessor->write_end);
359
360 /* Write the meta-data as one chunk. */
361 if (accessor->sts->meta_data_size > 0)
362 memcpy(accessor->write_pointer, meta_data,
363 accessor->sts->meta_data_size);
364
365 /*
366 * Write as much of the tuple as we can fit. This includes the
367 * tuple's size at the start.
368 */
369 written = accessor->write_end - accessor->write_pointer -
370 accessor->sts->meta_data_size;
371 memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
372 tuple, written);
373 ++accessor->write_chunk->ntuples;
374 size -= accessor->sts->meta_data_size;
375 size -= written;
376 /* Now write as many overflow chunks as we need for the rest. */
377 while (size > 0)
378 {
379 size_t written_this_chunk;
380
381 sts_flush_chunk(accessor);
382
383 /*
384 * How many overflow chunks to go? This will allow readers to
385 * skip all of them at once instead of reading each one.
386 */
387 accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
388 STS_CHUNK_DATA_SIZE;
389 written_this_chunk =
390 Min(accessor->write_end - accessor->write_pointer, size);
391 memcpy(accessor->write_pointer, (char *) tuple + written,
392 written_this_chunk);
393 accessor->write_pointer += written_this_chunk;
394 size -= written_this_chunk;
395 written += written_this_chunk;
396 }
397 return;
398 }
399 }
400
401 /* Copy meta-data and tuple into buffer. */
402 if (accessor->sts->meta_data_size > 0)
403 memcpy(accessor->write_pointer, meta_data,
404 accessor->sts->meta_data_size);
405 memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
406 tuple->t_len);
407 accessor->write_pointer += size;
408 ++accessor->write_chunk->ntuples;
409 }
410
411 static MinimalTuple
sts_read_tuple(SharedTuplestoreAccessor * accessor,void * meta_data)412 sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
413 {
414 MinimalTuple tuple;
415 uint32 size;
416 size_t remaining_size;
417 size_t this_chunk_size;
418 char *destination;
419
420 /*
421 * We'll keep track of bytes read from this chunk so that we can detect an
422 * overflowing tuple and switch to reading overflow pages.
423 */
424 if (accessor->sts->meta_data_size > 0)
425 {
426 if (BufFileRead(accessor->read_file,
427 meta_data,
428 accessor->sts->meta_data_size) !=
429 accessor->sts->meta_data_size)
430 ereport(ERROR,
431 (errcode_for_file_access(),
432 errmsg("could not read from shared tuplestore temporary file"),
433 errdetail_internal("Short read while reading meta-data.")));
434 accessor->read_bytes += accessor->sts->meta_data_size;
435 }
436 if (BufFileRead(accessor->read_file,
437 &size,
438 sizeof(size)) != sizeof(size))
439 ereport(ERROR,
440 (errcode_for_file_access(),
441 errmsg("could not read from shared tuplestore temporary file"),
442 errdetail_internal("Short read while reading size.")));
443 accessor->read_bytes += sizeof(size);
444 if (size > accessor->read_buffer_size)
445 {
446 size_t new_read_buffer_size;
447
448 if (accessor->read_buffer != NULL)
449 pfree(accessor->read_buffer);
450 new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
451 accessor->read_buffer =
452 MemoryContextAlloc(accessor->context, new_read_buffer_size);
453 accessor->read_buffer_size = new_read_buffer_size;
454 }
455 remaining_size = size - sizeof(uint32);
456 this_chunk_size = Min(remaining_size,
457 BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
458 destination = accessor->read_buffer + sizeof(uint32);
459 if (BufFileRead(accessor->read_file,
460 destination,
461 this_chunk_size) != this_chunk_size)
462 ereport(ERROR,
463 (errcode_for_file_access(),
464 errmsg("could not read from shared tuplestore temporary file"),
465 errdetail_internal("Short read while reading tuple.")));
466 accessor->read_bytes += this_chunk_size;
467 remaining_size -= this_chunk_size;
468 destination += this_chunk_size;
469 ++accessor->read_ntuples;
470
471 /* Check if we need to read any overflow chunks. */
472 while (remaining_size > 0)
473 {
474 /* We are now positioned at the start of an overflow chunk. */
475 SharedTuplestoreChunk chunk_header;
476
477 if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
478 STS_CHUNK_HEADER_SIZE)
479 ereport(ERROR,
480 (errcode_for_file_access(),
481 errmsg("could not read from shared tuplestore temporary file"),
482 errdetail_internal("Short read while reading overflow chunk header.")));
483 accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
484 if (chunk_header.overflow == 0)
485 ereport(ERROR,
486 (errcode_for_file_access(),
487 errmsg("unexpected chunk in shared tuplestore temporary file"),
488 errdetail_internal("Expected overflow chunk.")));
489 accessor->read_next_page += STS_CHUNK_PAGES;
490 this_chunk_size = Min(remaining_size,
491 BLCKSZ * STS_CHUNK_PAGES -
492 STS_CHUNK_HEADER_SIZE);
493 if (BufFileRead(accessor->read_file,
494 destination,
495 this_chunk_size) != this_chunk_size)
496 ereport(ERROR,
497 (errcode_for_file_access(),
498 errmsg("could not read from shared tuplestore temporary file"),
499 errdetail_internal("Short read while reading tuple.")));
500 accessor->read_bytes += this_chunk_size;
501 remaining_size -= this_chunk_size;
502 destination += this_chunk_size;
503
504 /*
505 * These will be used to count regular tuples following the oversized
506 * tuple that spilled into this overflow chunk.
507 */
508 accessor->read_ntuples = 0;
509 accessor->read_ntuples_available = chunk_header.ntuples;
510 }
511
512 tuple = (MinimalTuple) accessor->read_buffer;
513 tuple->t_len = size;
514
515 return tuple;
516 }
517
518 /*
519 * Get the next tuple in the current parallel scan.
520 */
521 MinimalTuple
sts_parallel_scan_next(SharedTuplestoreAccessor * accessor,void * meta_data)522 sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
523 {
524 SharedTuplestoreParticipant *p;
525 BlockNumber read_page;
526 bool eof;
527
528 for (;;)
529 {
530 /* Can we read more tuples from the current chunk? */
531 if (accessor->read_ntuples < accessor->read_ntuples_available)
532 return sts_read_tuple(accessor, meta_data);
533
534 /* Find the location of a new chunk to read. */
535 p = &accessor->sts->participants[accessor->read_participant];
536
537 LWLockAcquire(&p->lock, LW_EXCLUSIVE);
538 /* We can skip directly past overflow pages we know about. */
539 if (p->read_page < accessor->read_next_page)
540 p->read_page = accessor->read_next_page;
541 eof = p->read_page >= p->npages;
542 if (!eof)
543 {
544 /* Claim the next chunk. */
545 read_page = p->read_page;
546 /* Advance the read head for the next reader. */
547 p->read_page += STS_CHUNK_PAGES;
548 accessor->read_next_page = p->read_page;
549 }
550 LWLockRelease(&p->lock);
551
552 if (!eof)
553 {
554 SharedTuplestoreChunk chunk_header;
555 size_t nread;
556
557 /* Make sure we have the file open. */
558 if (accessor->read_file == NULL)
559 {
560 char name[MAXPGPATH];
561
562 sts_filename(name, accessor, accessor->read_participant);
563 accessor->read_file =
564 BufFileOpenShared(accessor->fileset, name);
565 }
566
567 /* Seek and load the chunk header. */
568 if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
569 ereport(ERROR,
570 (errcode_for_file_access(),
571 errmsg("could not seek block %u in shared tuplestore temporary file",
572 read_page)));
573 nread = BufFileRead(accessor->read_file, &chunk_header,
574 STS_CHUNK_HEADER_SIZE);
575 if (nread != STS_CHUNK_HEADER_SIZE)
576 ereport(ERROR,
577 (errcode_for_file_access(),
578 errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
579 nread, STS_CHUNK_HEADER_SIZE)));
580
581 /*
582 * If this is an overflow chunk, we skip it and any following
583 * overflow chunks all at once.
584 */
585 if (chunk_header.overflow > 0)
586 {
587 accessor->read_next_page = read_page +
588 chunk_header.overflow * STS_CHUNK_PAGES;
589 continue;
590 }
591
592 accessor->read_ntuples = 0;
593 accessor->read_ntuples_available = chunk_header.ntuples;
594 accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
595
596 /* Go around again, so we can get a tuple from this chunk. */
597 }
598 else
599 {
600 if (accessor->read_file != NULL)
601 {
602 BufFileClose(accessor->read_file);
603 accessor->read_file = NULL;
604 }
605
606 /*
607 * Try the next participant's file. If we've gone full circle,
608 * we're done.
609 */
610 accessor->read_participant = (accessor->read_participant + 1) %
611 accessor->sts->nparticipants;
612 if (accessor->read_participant == accessor->participant)
613 break;
614 accessor->read_next_page = 0;
615
616 /* Go around again, so we can get a chunk from this file. */
617 }
618 }
619
620 return NULL;
621 }
622
623 /*
624 * Create the name used for the BufFile that a given participant will write.
625 */
626 static void
sts_filename(char * name,SharedTuplestoreAccessor * accessor,int participant)627 sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
628 {
629 snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
630 }
631