1 /*-------------------------------------------------------------------------
2  *
3  * sharedtuplestore.c
4  *	  Simple mechanism for sharing tuples between backends.
5  *
6  * This module contains a shared temporary tuple storage mechanism providing
7  * a parallel-aware subset of the features of tuplestore.c.  Multiple backends
8  * can write to a SharedTuplestore, and then multiple backends can later scan
9  * the stored tuples.  Currently, the only scan type supported is a parallel
10  * scan where each backend reads an arbitrary subset of the tuples that were
11  * written.
12  *
13  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
14  * Portions Copyright (c) 1994, Regents of the University of California
15  *
16  * IDENTIFICATION
17  *	  src/backend/utils/sort/sharedtuplestore.c
18  *
19  *-------------------------------------------------------------------------
20  */
21 
22 #include "postgres.h"
23 
24 #include "access/htup.h"
25 #include "access/htup_details.h"
26 #include "miscadmin.h"
27 #include "storage/buffile.h"
28 #include "storage/lwlock.h"
29 #include "storage/sharedfileset.h"
30 #include "utils/sharedtuplestore.h"
31 
32 #include <limits.h>
33 
34 /*
35  * The size of chunks, in pages.  This is somewhat arbitrarily set to match
36  * the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
37  * at approximately the same rate as it allocates new chunks of memory to
38  * insert them into.
39  */
40 #define STS_CHUNK_PAGES 4
41 #define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
42 #define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
43 
44 /* Chunk written to disk. */
45 typedef struct SharedTuplestoreChunk
46 {
47 	int			ntuples;		/* Number of tuples in this chunk. */
48 	int			overflow;		/* If overflow, how many including this one? */
49 	char		data[FLEXIBLE_ARRAY_MEMBER];
50 } SharedTuplestoreChunk;
51 
52 /* Per-participant shared state. */
53 typedef struct SharedTuplestoreParticipant
54 {
55 	LWLock		lock;
56 	BlockNumber read_page;		/* Page number for next read. */
57 	BlockNumber npages;			/* Number of pages written. */
58 	bool		writing;		/* Used only for assertions. */
59 } SharedTuplestoreParticipant;
60 
61 /* The control object that lives in shared memory. */
62 struct SharedTuplestore
63 {
64 	int			nparticipants;	/* Number of participants that can write. */
65 	int			flags;			/* Flag bits from SHARED_TUPLESTORE_XXX */
66 	size_t		meta_data_size; /* Size of per-tuple header. */
67 	char		name[NAMEDATALEN];	/* A name for this tuplestore. */
68 
69 	/* Followed by per-participant shared state. */
70 	SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
71 };
72 
73 /* Per-participant state that lives in backend-local memory. */
74 struct SharedTuplestoreAccessor
75 {
76 	int			participant;	/* My participant number. */
77 	SharedTuplestore *sts;		/* The shared state. */
78 	SharedFileSet *fileset;		/* The SharedFileSet holding files. */
79 	MemoryContext context;		/* Memory context for buffers. */
80 
81 	/* State for reading. */
82 	int			read_participant;	/* The current participant to read from. */
83 	BufFile    *read_file;		/* The current file to read from. */
84 	int			read_ntuples_available; /* The number of tuples in chunk. */
85 	int			read_ntuples;	/* How many tuples have we read from chunk? */
86 	size_t		read_bytes;		/* How many bytes have we read from chunk? */
87 	char	   *read_buffer;	/* A buffer for loading tuples. */
88 	size_t		read_buffer_size;
89 	BlockNumber read_next_page; /* Lowest block we'll consider reading. */
90 
91 	/* State for writing. */
92 	SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
93 	BufFile    *write_file;		/* The current file to write to. */
94 	BlockNumber write_page;		/* The next page to write to. */
95 	char	   *write_pointer;	/* Current write pointer within chunk. */
96 	char	   *write_end;		/* One past the end of the current chunk. */
97 };
98 
99 static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
100 						 int participant);
101 
102 /*
103  * Return the amount of shared memory required to hold SharedTuplestore for a
104  * given number of participants.
105  */
106 size_t
sts_estimate(int participants)107 sts_estimate(int participants)
108 {
109 	return offsetof(SharedTuplestore, participants) +
110 		sizeof(SharedTuplestoreParticipant) * participants;
111 }
112 
113 /*
114  * Initialize a SharedTuplestore in existing shared memory.  There must be
115  * space for sts_estimate(participants) bytes.  If flags includes the value
116  * SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
117  * eagerly (but this isn't yet implemented).
118  *
119  * Tuples that are stored may optionally carry a piece of fixed sized
120  * meta-data which will be retrieved along with the tuple.  This is useful for
121  * the hash values used in multi-batch hash joins, but could have other
122  * applications.
123  *
124  * The caller must supply a SharedFileSet, which is essentially a directory
125  * that will be cleaned up automatically, and a name which must be unique
126  * across all SharedTuplestores created in the same SharedFileSet.
127  */
128 SharedTuplestoreAccessor *
sts_initialize(SharedTuplestore * sts,int participants,int my_participant_number,size_t meta_data_size,int flags,SharedFileSet * fileset,const char * name)129 sts_initialize(SharedTuplestore *sts, int participants,
130 			   int my_participant_number,
131 			   size_t meta_data_size,
132 			   int flags,
133 			   SharedFileSet *fileset,
134 			   const char *name)
135 {
136 	SharedTuplestoreAccessor *accessor;
137 	int			i;
138 
139 	Assert(my_participant_number < participants);
140 
141 	sts->nparticipants = participants;
142 	sts->meta_data_size = meta_data_size;
143 	sts->flags = flags;
144 
145 	if (strlen(name) > sizeof(sts->name) - 1)
146 		elog(ERROR, "SharedTuplestore name too long");
147 	strcpy(sts->name, name);
148 
149 	/*
150 	 * Limit meta-data so it + tuple size always fits into a single chunk.
151 	 * sts_puttuple() and sts_read_tuple() could be made to support scenarios
152 	 * where that's not the case, but it's not currently required. If so,
153 	 * meta-data size probably should be made variable, too.
154 	 */
155 	if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
156 		elog(ERROR, "meta-data too long");
157 
158 	for (i = 0; i < participants; ++i)
159 	{
160 		LWLockInitialize(&sts->participants[i].lock,
161 						 LWTRANCHE_SHARED_TUPLESTORE);
162 		sts->participants[i].read_page = 0;
163 		sts->participants[i].writing = false;
164 	}
165 
166 	accessor = palloc0(sizeof(SharedTuplestoreAccessor));
167 	accessor->participant = my_participant_number;
168 	accessor->sts = sts;
169 	accessor->fileset = fileset;
170 	accessor->context = CurrentMemoryContext;
171 
172 	return accessor;
173 }
174 
175 /*
176  * Attach to a SharedTuplestore that has been initialized by another backend,
177  * so that this backend can read and write tuples.
178  */
179 SharedTuplestoreAccessor *
sts_attach(SharedTuplestore * sts,int my_participant_number,SharedFileSet * fileset)180 sts_attach(SharedTuplestore *sts,
181 		   int my_participant_number,
182 		   SharedFileSet *fileset)
183 {
184 	SharedTuplestoreAccessor *accessor;
185 
186 	Assert(my_participant_number < sts->nparticipants);
187 
188 	accessor = palloc0(sizeof(SharedTuplestoreAccessor));
189 	accessor->participant = my_participant_number;
190 	accessor->sts = sts;
191 	accessor->fileset = fileset;
192 	accessor->context = CurrentMemoryContext;
193 
194 	return accessor;
195 }
196 
197 static void
sts_flush_chunk(SharedTuplestoreAccessor * accessor)198 sts_flush_chunk(SharedTuplestoreAccessor *accessor)
199 {
200 	size_t		size;
201 
202 	size = STS_CHUNK_PAGES * BLCKSZ;
203 	BufFileWrite(accessor->write_file, accessor->write_chunk, size);
204 	memset(accessor->write_chunk, 0, size);
205 	accessor->write_pointer = &accessor->write_chunk->data[0];
206 	accessor->sts->participants[accessor->participant].npages +=
207 		STS_CHUNK_PAGES;
208 }
209 
210 /*
211  * Finish writing tuples.  This must be called by all backends that have
212  * written data before any backend begins reading it.
213  */
214 void
sts_end_write(SharedTuplestoreAccessor * accessor)215 sts_end_write(SharedTuplestoreAccessor *accessor)
216 {
217 	if (accessor->write_file != NULL)
218 	{
219 		sts_flush_chunk(accessor);
220 		BufFileClose(accessor->write_file);
221 		pfree(accessor->write_chunk);
222 		accessor->write_chunk = NULL;
223 		accessor->write_file = NULL;
224 		accessor->sts->participants[accessor->participant].writing = false;
225 	}
226 }
227 
228 /*
229  * Prepare to rescan.  Only one participant must call this.  After it returns,
230  * all participants may call sts_begin_parallel_scan() and then loop over
231  * sts_parallel_scan_next().  This function must not be called concurrently
232  * with a scan, and synchronization to avoid that is the caller's
233  * responsibility.
234  */
235 void
sts_reinitialize(SharedTuplestoreAccessor * accessor)236 sts_reinitialize(SharedTuplestoreAccessor *accessor)
237 {
238 	int			i;
239 
240 	/*
241 	 * Reset the shared read head for all participants' files.  Also set the
242 	 * initial chunk size to the minimum (any increases from that size will be
243 	 * recorded in chunk_expansion_log).
244 	 */
245 	for (i = 0; i < accessor->sts->nparticipants; ++i)
246 	{
247 		accessor->sts->participants[i].read_page = 0;
248 	}
249 }
250 
251 /*
252  * Begin scanning the contents in parallel.
253  */
254 void
sts_begin_parallel_scan(SharedTuplestoreAccessor * accessor)255 sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
256 {
257 	int			i PG_USED_FOR_ASSERTS_ONLY;
258 
259 	/* End any existing scan that was in progress. */
260 	sts_end_parallel_scan(accessor);
261 
262 	/*
263 	 * Any backend that might have written into this shared tuplestore must
264 	 * have called sts_end_write(), so that all buffers are flushed and the
265 	 * files have stopped growing.
266 	 */
267 	for (i = 0; i < accessor->sts->nparticipants; ++i)
268 		Assert(!accessor->sts->participants[i].writing);
269 
270 	/*
271 	 * We will start out reading the file that THIS backend wrote.  There may
272 	 * be some caching locality advantage to that.
273 	 */
274 	accessor->read_participant = accessor->participant;
275 	accessor->read_file = NULL;
276 	accessor->read_next_page = 0;
277 }
278 
279 /*
280  * Finish a parallel scan, freeing associated backend-local resources.
281  */
282 void
sts_end_parallel_scan(SharedTuplestoreAccessor * accessor)283 sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
284 {
285 	/*
286 	 * Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
287 	 * we'd probably need a reference count of current parallel scanners so we
288 	 * could safely do it only when the reference count reaches zero.
289 	 */
290 	if (accessor->read_file != NULL)
291 	{
292 		BufFileClose(accessor->read_file);
293 		accessor->read_file = NULL;
294 	}
295 }
296 
297 /*
298  * Write a tuple.  If a meta-data size was provided to sts_initialize, then a
299  * pointer to meta data of that size must be provided.
300  */
301 void
sts_puttuple(SharedTuplestoreAccessor * accessor,void * meta_data,MinimalTuple tuple)302 sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
303 			 MinimalTuple tuple)
304 {
305 	size_t		size;
306 
307 	/* Do we have our own file yet? */
308 	if (accessor->write_file == NULL)
309 	{
310 		SharedTuplestoreParticipant *participant;
311 		char		name[MAXPGPATH];
312 
313 		/* Create one.  Only this backend will write into it. */
314 		sts_filename(name, accessor, accessor->participant);
315 		accessor->write_file = BufFileCreateShared(accessor->fileset, name);
316 
317 		/* Set up the shared state for this backend's file. */
318 		participant = &accessor->sts->participants[accessor->participant];
319 		participant->writing = true;	/* for assertions only */
320 	}
321 
322 	/* Do we have space? */
323 	size = accessor->sts->meta_data_size + tuple->t_len;
324 	if (accessor->write_pointer + size >= accessor->write_end)
325 	{
326 		if (accessor->write_chunk == NULL)
327 		{
328 			/* First time through.  Allocate chunk. */
329 			accessor->write_chunk = (SharedTuplestoreChunk *)
330 				MemoryContextAllocZero(accessor->context,
331 									   STS_CHUNK_PAGES * BLCKSZ);
332 			accessor->write_chunk->ntuples = 0;
333 			accessor->write_pointer = &accessor->write_chunk->data[0];
334 			accessor->write_end = (char *)
335 				accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
336 		}
337 		else
338 		{
339 			/* See if flushing helps. */
340 			sts_flush_chunk(accessor);
341 		}
342 
343 		/* It may still not be enough in the case of a gigantic tuple. */
344 		if (accessor->write_pointer + size >= accessor->write_end)
345 		{
346 			size_t		written;
347 
348 			/*
349 			 * We'll write the beginning of the oversized tuple, and then
350 			 * write the rest in some number of 'overflow' chunks.
351 			 *
352 			 * sts_initialize() verifies that the size of the tuple +
353 			 * meta-data always fits into a chunk. Because the chunk has been
354 			 * flushed above, we can be sure to have all of a chunk's usable
355 			 * space available.
356 			 */
357 			Assert(accessor->write_pointer + accessor->sts->meta_data_size +
358 				   sizeof(uint32) < accessor->write_end);
359 
360 			/* Write the meta-data as one chunk. */
361 			if (accessor->sts->meta_data_size > 0)
362 				memcpy(accessor->write_pointer, meta_data,
363 					   accessor->sts->meta_data_size);
364 
365 			/*
366 			 * Write as much of the tuple as we can fit. This includes the
367 			 * tuple's size at the start.
368 			 */
369 			written = accessor->write_end - accessor->write_pointer -
370 				accessor->sts->meta_data_size;
371 			memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
372 				   tuple, written);
373 			++accessor->write_chunk->ntuples;
374 			size -= accessor->sts->meta_data_size;
375 			size -= written;
376 			/* Now write as many overflow chunks as we need for the rest. */
377 			while (size > 0)
378 			{
379 				size_t		written_this_chunk;
380 
381 				sts_flush_chunk(accessor);
382 
383 				/*
384 				 * How many overflow chunks to go?  This will allow readers to
385 				 * skip all of them at once instead of reading each one.
386 				 */
387 				accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
388 					STS_CHUNK_DATA_SIZE;
389 				written_this_chunk =
390 					Min(accessor->write_end - accessor->write_pointer, size);
391 				memcpy(accessor->write_pointer, (char *) tuple + written,
392 					   written_this_chunk);
393 				accessor->write_pointer += written_this_chunk;
394 				size -= written_this_chunk;
395 				written += written_this_chunk;
396 			}
397 			return;
398 		}
399 	}
400 
401 	/* Copy meta-data and tuple into buffer. */
402 	if (accessor->sts->meta_data_size > 0)
403 		memcpy(accessor->write_pointer, meta_data,
404 			   accessor->sts->meta_data_size);
405 	memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
406 		   tuple->t_len);
407 	accessor->write_pointer += size;
408 	++accessor->write_chunk->ntuples;
409 }
410 
411 static MinimalTuple
sts_read_tuple(SharedTuplestoreAccessor * accessor,void * meta_data)412 sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
413 {
414 	MinimalTuple tuple;
415 	uint32		size;
416 	size_t		remaining_size;
417 	size_t		this_chunk_size;
418 	char	   *destination;
419 
420 	/*
421 	 * We'll keep track of bytes read from this chunk so that we can detect an
422 	 * overflowing tuple and switch to reading overflow pages.
423 	 */
424 	if (accessor->sts->meta_data_size > 0)
425 	{
426 		if (BufFileRead(accessor->read_file,
427 						meta_data,
428 						accessor->sts->meta_data_size) !=
429 			accessor->sts->meta_data_size)
430 			ereport(ERROR,
431 					(errcode_for_file_access(),
432 					 errmsg("could not read from shared tuplestore temporary file"),
433 					 errdetail_internal("Short read while reading meta-data.")));
434 		accessor->read_bytes += accessor->sts->meta_data_size;
435 	}
436 	if (BufFileRead(accessor->read_file,
437 					&size,
438 					sizeof(size)) != sizeof(size))
439 		ereport(ERROR,
440 				(errcode_for_file_access(),
441 				 errmsg("could not read from shared tuplestore temporary file"),
442 				 errdetail_internal("Short read while reading size.")));
443 	accessor->read_bytes += sizeof(size);
444 	if (size > accessor->read_buffer_size)
445 	{
446 		size_t		new_read_buffer_size;
447 
448 		if (accessor->read_buffer != NULL)
449 			pfree(accessor->read_buffer);
450 		new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
451 		accessor->read_buffer =
452 			MemoryContextAlloc(accessor->context, new_read_buffer_size);
453 		accessor->read_buffer_size = new_read_buffer_size;
454 	}
455 	remaining_size = size - sizeof(uint32);
456 	this_chunk_size = Min(remaining_size,
457 						  BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
458 	destination = accessor->read_buffer + sizeof(uint32);
459 	if (BufFileRead(accessor->read_file,
460 					destination,
461 					this_chunk_size) != this_chunk_size)
462 		ereport(ERROR,
463 				(errcode_for_file_access(),
464 				 errmsg("could not read from shared tuplestore temporary file"),
465 				 errdetail_internal("Short read while reading tuple.")));
466 	accessor->read_bytes += this_chunk_size;
467 	remaining_size -= this_chunk_size;
468 	destination += this_chunk_size;
469 	++accessor->read_ntuples;
470 
471 	/* Check if we need to read any overflow chunks. */
472 	while (remaining_size > 0)
473 	{
474 		/* We are now positioned at the start of an overflow chunk. */
475 		SharedTuplestoreChunk chunk_header;
476 
477 		if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
478 			STS_CHUNK_HEADER_SIZE)
479 			ereport(ERROR,
480 					(errcode_for_file_access(),
481 					 errmsg("could not read from shared tuplestore temporary file"),
482 					 errdetail_internal("Short read while reading overflow chunk header.")));
483 		accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
484 		if (chunk_header.overflow == 0)
485 			ereport(ERROR,
486 					(errcode_for_file_access(),
487 					 errmsg("unexpected chunk in shared tuplestore temporary file"),
488 					 errdetail_internal("Expected overflow chunk.")));
489 		accessor->read_next_page += STS_CHUNK_PAGES;
490 		this_chunk_size = Min(remaining_size,
491 							  BLCKSZ * STS_CHUNK_PAGES -
492 							  STS_CHUNK_HEADER_SIZE);
493 		if (BufFileRead(accessor->read_file,
494 						destination,
495 						this_chunk_size) != this_chunk_size)
496 			ereport(ERROR,
497 					(errcode_for_file_access(),
498 					 errmsg("could not read from shared tuplestore temporary file"),
499 					 errdetail_internal("Short read while reading tuple.")));
500 		accessor->read_bytes += this_chunk_size;
501 		remaining_size -= this_chunk_size;
502 		destination += this_chunk_size;
503 
504 		/*
505 		 * These will be used to count regular tuples following the oversized
506 		 * tuple that spilled into this overflow chunk.
507 		 */
508 		accessor->read_ntuples = 0;
509 		accessor->read_ntuples_available = chunk_header.ntuples;
510 	}
511 
512 	tuple = (MinimalTuple) accessor->read_buffer;
513 	tuple->t_len = size;
514 
515 	return tuple;
516 }
517 
518 /*
519  * Get the next tuple in the current parallel scan.
520  */
521 MinimalTuple
sts_parallel_scan_next(SharedTuplestoreAccessor * accessor,void * meta_data)522 sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
523 {
524 	SharedTuplestoreParticipant *p;
525 	BlockNumber read_page;
526 	bool		eof;
527 
528 	for (;;)
529 	{
530 		/* Can we read more tuples from the current chunk? */
531 		if (accessor->read_ntuples < accessor->read_ntuples_available)
532 			return sts_read_tuple(accessor, meta_data);
533 
534 		/* Find the location of a new chunk to read. */
535 		p = &accessor->sts->participants[accessor->read_participant];
536 
537 		LWLockAcquire(&p->lock, LW_EXCLUSIVE);
538 		/* We can skip directly past overflow pages we know about. */
539 		if (p->read_page < accessor->read_next_page)
540 			p->read_page = accessor->read_next_page;
541 		eof = p->read_page >= p->npages;
542 		if (!eof)
543 		{
544 			/* Claim the next chunk. */
545 			read_page = p->read_page;
546 			/* Advance the read head for the next reader. */
547 			p->read_page += STS_CHUNK_PAGES;
548 			accessor->read_next_page = p->read_page;
549 		}
550 		LWLockRelease(&p->lock);
551 
552 		if (!eof)
553 		{
554 			SharedTuplestoreChunk chunk_header;
555 			size_t		nread;
556 
557 			/* Make sure we have the file open. */
558 			if (accessor->read_file == NULL)
559 			{
560 				char		name[MAXPGPATH];
561 
562 				sts_filename(name, accessor, accessor->read_participant);
563 				accessor->read_file =
564 					BufFileOpenShared(accessor->fileset, name);
565 			}
566 
567 			/* Seek and load the chunk header. */
568 			if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
569 				ereport(ERROR,
570 						(errcode_for_file_access(),
571 						 errmsg("could not seek block %u in shared tuplestore temporary file",
572 								read_page)));
573 			nread = BufFileRead(accessor->read_file, &chunk_header,
574 								STS_CHUNK_HEADER_SIZE);
575 			if (nread != STS_CHUNK_HEADER_SIZE)
576 				ereport(ERROR,
577 						(errcode_for_file_access(),
578 						 errmsg("could not read from shared tuplestore temporary file: read only %zu of %zu bytes",
579 								nread, STS_CHUNK_HEADER_SIZE)));
580 
581 			/*
582 			 * If this is an overflow chunk, we skip it and any following
583 			 * overflow chunks all at once.
584 			 */
585 			if (chunk_header.overflow > 0)
586 			{
587 				accessor->read_next_page = read_page +
588 					chunk_header.overflow * STS_CHUNK_PAGES;
589 				continue;
590 			}
591 
592 			accessor->read_ntuples = 0;
593 			accessor->read_ntuples_available = chunk_header.ntuples;
594 			accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
595 
596 			/* Go around again, so we can get a tuple from this chunk. */
597 		}
598 		else
599 		{
600 			if (accessor->read_file != NULL)
601 			{
602 				BufFileClose(accessor->read_file);
603 				accessor->read_file = NULL;
604 			}
605 
606 			/*
607 			 * Try the next participant's file.  If we've gone full circle,
608 			 * we're done.
609 			 */
610 			accessor->read_participant = (accessor->read_participant + 1) %
611 				accessor->sts->nparticipants;
612 			if (accessor->read_participant == accessor->participant)
613 				break;
614 			accessor->read_next_page = 0;
615 
616 			/* Go around again, so we can get a chunk from this file. */
617 		}
618 	}
619 
620 	return NULL;
621 }
622 
623 /*
624  * Create the name used for the BufFile that a given participant will write.
625  */
626 static void
sts_filename(char * name,SharedTuplestoreAccessor * accessor,int participant)627 sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
628 {
629 	snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
630 }
631