1 /*-------------------------------------------------------------------------
2  *
3  * cstore_fdw.h
4  *
5  * Type and function declarations for CStore foreign data wrapper.
6  *
7  * Copyright (c) 2016, Citus Data, Inc.
8  *
9  * $Id$
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #ifndef CSTORE_FDW_H
15 #define CSTORE_FDW_H
16 
17 #include "access/tupdesc.h"
18 #include "fmgr.h"
19 #include "catalog/pg_am.h"
20 #include "catalog/pg_foreign_server.h"
21 #include "catalog/pg_foreign_table.h"
22 #include "lib/stringinfo.h"
23 #include "utils/rel.h"
24 
25 
26 /* Defines for valid option names */
27 #define OPTION_NAME_FILENAME "filename"
28 #define OPTION_NAME_COMPRESSION_TYPE "compression"
29 #define OPTION_NAME_STRIPE_ROW_COUNT "stripe_row_count"
30 #define OPTION_NAME_BLOCK_ROW_COUNT "block_row_count"
31 
32 /* Default values for option parameters */
33 #define DEFAULT_COMPRESSION_TYPE COMPRESSION_NONE
34 #define DEFAULT_STRIPE_ROW_COUNT 150000
35 #define DEFAULT_BLOCK_ROW_COUNT 10000
36 
37 /* Limits for option parameters */
38 #define STRIPE_ROW_COUNT_MINIMUM 1000
39 #define STRIPE_ROW_COUNT_MAXIMUM 10000000
40 #define BLOCK_ROW_COUNT_MINIMUM 1000
41 #define BLOCK_ROW_COUNT_MAXIMUM 100000
42 
43 /* String representations of compression types */
44 #define COMPRESSION_STRING_NONE "none"
45 #define COMPRESSION_STRING_PG_LZ "pglz"
46 #define COMPRESSION_STRING_DELIMITED_LIST "none, pglz"
47 
48 /* CStore file signature */
49 #define CSTORE_MAGIC_NUMBER "citus_cstore"
50 #define CSTORE_VERSION_MAJOR 1
51 #define CSTORE_VERSION_MINOR 7
52 
53 /* miscellaneous defines */
54 #define CSTORE_FDW_NAME "cstore_fdw"
55 #define CSTORE_FOOTER_FILE_SUFFIX ".footer"
56 #define CSTORE_TEMP_FILE_SUFFIX ".tmp"
57 #define CSTORE_TUPLE_COST_MULTIPLIER 10
58 #define CSTORE_POSTSCRIPT_SIZE_LENGTH 1
59 #define CSTORE_POSTSCRIPT_SIZE_MAX 256
60 
61 /* table containing information about how to partition distributed tables */
62 #define CITUS_EXTENSION_NAME "citus"
63 #define CITUS_PARTITION_TABLE_NAME "pg_dist_partition"
64 
65 /* human-readable names for addressing columns of the pg_dist_partition table */
66 #define ATTR_NUM_PARTITION_RELATION_ID 1
67 #define ATTR_NUM_PARTITION_TYPE 2
68 #define ATTR_NUM_PARTITION_KEY 3
69 
70 
71 /*
72  * CStoreValidOption keeps an option name and a context. When an option is passed
73  * into cstore_fdw objects (server and foreign table), we compare this option's
74  * name and context against those of valid options.
75  */
76 typedef struct CStoreValidOption
77 {
78 	const char *optionName;
79 	Oid optionContextId;
80 
81 } CStoreValidOption;
82 
83 
84 /* Array of options that are valid for cstore_fdw */
85 static const uint32 ValidOptionCount = 4;
86 static const CStoreValidOption ValidOptionArray[] =
87 {
88 	/* foreign table options */
89 	{ OPTION_NAME_FILENAME, ForeignTableRelationId },
90 	{ OPTION_NAME_COMPRESSION_TYPE, ForeignTableRelationId },
91 	{ OPTION_NAME_STRIPE_ROW_COUNT, ForeignTableRelationId },
92 	{ OPTION_NAME_BLOCK_ROW_COUNT, ForeignTableRelationId }
93 };
94 
95 
96 /* Enumaration for cstore file's compression method */
97 typedef enum
98 {
99 	COMPRESSION_TYPE_INVALID = -1,
100 	COMPRESSION_NONE = 0,
101 	COMPRESSION_PG_LZ = 1,
102 
103 	COMPRESSION_COUNT
104 
105 } CompressionType;
106 
107 
108 /*
109  * CStoreFdwOptions holds the option values to be used when reading or writing
110  * a cstore file. To resolve these values, we first check foreign table's options,
111  * and if not present, we then fall back to the default values specified above.
112  */
113 typedef struct CStoreFdwOptions
114 {
115 	char *filename;
116 	CompressionType compressionType;
117 	uint64 stripeRowCount;
118 	uint32 blockRowCount;
119 
120 } CStoreFdwOptions;
121 
122 
123 /*
124  * StripeMetadata represents information about a stripe. This information is
125  * stored in the cstore file's footer.
126  */
127 typedef struct StripeMetadata
128 {
129 	uint64 fileOffset;
130 	uint64 skipListLength;
131 	uint64 dataLength;
132 	uint64 footerLength;
133 
134 } StripeMetadata;
135 
136 
137 /* TableFooter represents the footer of a cstore file. */
138 typedef struct TableFooter
139 {
140 	List *stripeMetadataList;
141 	uint64 blockRowCount;
142 
143 } TableFooter;
144 
145 
146 /* ColumnBlockSkipNode contains statistics for a ColumnBlockData. */
147 typedef struct ColumnBlockSkipNode
148 {
149 	/* statistics about values of a column block */
150 	bool hasMinMax;
151 	Datum minimumValue;
152 	Datum maximumValue;
153 	uint64 rowCount;
154 
155 	/*
156 	 * Offsets and sizes of value and exists streams in the column data.
157 	 * These enable us to skip reading suppressed row blocks, and start reading
158 	 * a block without reading previous blocks.
159 	 */
160 	uint64 valueBlockOffset;
161 	uint64 valueLength;
162 	uint64 existsBlockOffset;
163 	uint64 existsLength;
164 
165 	CompressionType valueCompressionType;
166 
167 } ColumnBlockSkipNode;
168 
169 
170 /*
171  * StripeSkipList can be used for skipping row blocks. It contains a column block
172  * skip node for each block of each column. blockSkipNodeArray[column][block]
173  * is the entry for the specified column block.
174  */
175 typedef struct StripeSkipList
176 {
177 	ColumnBlockSkipNode **blockSkipNodeArray;
178 	uint32 columnCount;
179 	uint32 blockCount;
180 
181 } StripeSkipList;
182 
183 
184 /*
185  * ColumnBlockData represents a block of data in a column. valueArray stores
186  * the values of data, and existsArray stores whether a value is present.
187  * valueBuffer is used to store (uncompressed) serialized values
188  * referenced by Datum's in valueArray. It is only used for by-reference Datum's.
189  * There is a one-to-one correspondence between valueArray and existsArray.
190  */
191 typedef struct ColumnBlockData
192 {
193 	bool *existsArray;
194 	Datum *valueArray;
195 
196 	/* valueBuffer keeps actual data for type-by-reference datums from valueArray. */
197 	StringInfo valueBuffer;
198 
199 } ColumnBlockData;
200 
201 
202 /*
203  * ColumnBlockBuffers represents a block of serialized data in a column.
204  * valueBuffer stores the serialized values of data, and existsBuffer stores
205  * serialized value of presence information. valueCompressionType contains
206  * compression type if valueBuffer is compressed. Finally rowCount has
207  * the number of rows in this block.
208  */
209 typedef struct ColumnBlockBuffers
210 {
211 	StringInfo existsBuffer;
212 	StringInfo valueBuffer;
213 	CompressionType valueCompressionType;
214 
215 } ColumnBlockBuffers;
216 
217 
218 /*
219  * ColumnBuffers represents data buffers for a column in a row stripe. Each
220  * column is made of multiple column blocks.
221  */
222 typedef struct ColumnBuffers
223 {
224 	ColumnBlockBuffers **blockBuffersArray;
225 
226 } ColumnBuffers;
227 
228 
229 /* StripeBuffers represents data for a row stripe in a cstore file. */
230 typedef struct StripeBuffers
231 {
232 	uint32 columnCount;
233 	uint32 rowCount;
234 	ColumnBuffers **columnBuffersArray;
235 
236 } StripeBuffers;
237 
238 
239 /*
240  * StripeFooter represents a stripe's footer. In this footer, we keep three
241  * arrays of sizes. The number of elements in each of the arrays is equal
242  * to the number of columns.
243  */
244 typedef struct StripeFooter
245 {
246 	uint32 columnCount;
247 	uint64 *skipListSizeArray;
248 	uint64 *existsSizeArray;
249 	uint64 *valueSizeArray;
250 
251 } StripeFooter;
252 
253 
254 /* TableReadState represents state of a cstore file read operation. */
255 typedef struct TableReadState
256 {
257 	FILE *tableFile;
258 	TableFooter *tableFooter;
259 	TupleDesc tupleDescriptor;
260 
261 	/*
262 	 * List of Var pointers for columns in the query. We use this both for
263 	 * getting vector of projected columns, and also when we want to build
264 	 * base constraint to find selected row blocks.
265 	 */
266 	List *projectedColumnList;
267 
268 	List *whereClauseList;
269 	MemoryContext stripeReadContext;
270 	StripeBuffers *stripeBuffers;
271 	uint32 readStripeCount;
272 	uint64 stripeReadRowCount;
273 	ColumnBlockData **blockDataArray;
274 	int32 deserializedBlockIndex;
275 
276 } TableReadState;
277 
278 
279 /* TableWriteState represents state of a cstore file write operation. */
280 typedef struct TableWriteState
281 {
282 	FILE *tableFile;
283 	TableFooter *tableFooter;
284 	StringInfo tableFooterFilename;
285 	CompressionType compressionType;
286 	TupleDesc tupleDescriptor;
287 	FmgrInfo **comparisonFunctionArray;
288 	uint64 currentFileOffset;
289 	Relation relation;
290 
291 	MemoryContext stripeWriteContext;
292 	StripeBuffers *stripeBuffers;
293 	StripeSkipList *stripeSkipList;
294 	uint32 stripeMaxRowCount;
295 	ColumnBlockData **blockDataArray;
296 	/*
297 	 * compressionBuffer buffer is used as temporary storage during
298 	 * data value compression operation. It is kept here to minimize
299 	 * memory allocations. It lives in stripeWriteContext and gets
300 	 * deallocated when memory context is reset.
301 	 */
302 	StringInfo compressionBuffer;
303 
304 } TableWriteState;
305 
306 /* Function declarations for extension loading and unloading */
307 extern void _PG_init(void);
308 extern void _PG_fini(void);
309 
310 /* event trigger function declarations */
311 extern Datum cstore_ddl_event_end_trigger(PG_FUNCTION_ARGS);
312 
313 /* Function declarations for utility UDFs */
314 extern Datum cstore_table_size(PG_FUNCTION_ARGS);
315 extern Datum cstore_clean_table_resources(PG_FUNCTION_ARGS);
316 
317 /* Function declarations for foreign data wrapper */
318 extern Datum cstore_fdw_handler(PG_FUNCTION_ARGS);
319 extern Datum cstore_fdw_validator(PG_FUNCTION_ARGS);
320 
321 /* Function declarations for writing to a cstore file */
322 extern TableWriteState * CStoreBeginWrite(const char *filename,
323 										  CompressionType compressionType,
324 										  uint64 stripeMaxRowCount,
325 										  uint32 blockRowCount,
326 										  TupleDesc tupleDescriptor);
327 extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
328 						   bool *columnNulls);
329 extern void CStoreEndWrite(TableWriteState * state);
330 
331 /* Function declarations for reading from a cstore file */
332 extern TableReadState * CStoreBeginRead(const char *filename, TupleDesc tupleDescriptor,
333 										List *projectedColumnList, List *qualConditions);
334 extern TableFooter * CStoreReadFooter(StringInfo tableFooterFilename);
335 extern bool CStoreReadFinished(TableReadState *state);
336 extern bool CStoreReadNextRow(TableReadState *state, Datum *columnValues,
337 							  bool *columnNulls);
338 extern void CStoreEndRead(TableReadState *state);
339 
340 /* Function declarations for common functions */
341 extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
342 										int16 procedureId);
343 extern ColumnBlockData ** CreateEmptyBlockDataArray(uint32 columnCount, bool *columnMask,
344 													uint32 blockRowCount);
345 extern void FreeColumnBlockDataArray(ColumnBlockData **blockDataArray,
346 									 uint32 columnCount);
347 extern uint64 CStoreTableRowCount(const char *filename);
348 extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
349 						   CompressionType compressionType);
350 extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
351 
352 
353 #endif   /* CSTORE_FDW_H */
354