1 /*-------------------------------------------------------------------------
2  *
3  * blscan.c
4  *		Bloom index scan functions.
5  *
6  * Copyright (c) 2016-2017, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  *	  contrib/bloom/blscan.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #include "access/relscan.h"
16 #include "pgstat.h"
17 #include "miscadmin.h"
18 #include "storage/bufmgr.h"
19 #include "storage/lmgr.h"
20 #include "utils/memutils.h"
21 #include "utils/rel.h"
22 
23 #include "bloom.h"
24 
25 /*
26  * Begin scan of bloom index.
27  */
28 IndexScanDesc
blbeginscan(Relation r,int nkeys,int norderbys)29 blbeginscan(Relation r, int nkeys, int norderbys)
30 {
31 	IndexScanDesc scan;
32 	BloomScanOpaque so;
33 
34 	scan = RelationGetIndexScan(r, nkeys, norderbys);
35 
36 	so = (BloomScanOpaque) palloc(sizeof(BloomScanOpaqueData));
37 	initBloomState(&so->state, scan->indexRelation);
38 	so->sign = NULL;
39 
40 	scan->opaque = so;
41 
42 	return scan;
43 }
44 
45 /*
46  * Rescan a bloom index.
47  */
48 void
blrescan(IndexScanDesc scan,ScanKey scankey,int nscankeys,ScanKey orderbys,int norderbys)49 blrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
50 		 ScanKey orderbys, int norderbys)
51 {
52 	BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
53 
54 	if (so->sign)
55 		pfree(so->sign);
56 	so->sign = NULL;
57 
58 	if (scankey && scan->numberOfKeys > 0)
59 	{
60 		memmove(scan->keyData, scankey,
61 				scan->numberOfKeys * sizeof(ScanKeyData));
62 	}
63 }
64 
65 /*
66  * End scan of bloom index.
67  */
68 void
blendscan(IndexScanDesc scan)69 blendscan(IndexScanDesc scan)
70 {
71 	BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
72 
73 	if (so->sign)
74 		pfree(so->sign);
75 	so->sign = NULL;
76 }
77 
78 /*
79  * Insert all matching tuples into a bitmap.
80  */
81 int64
blgetbitmap(IndexScanDesc scan,TIDBitmap * tbm)82 blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
83 {
84 	int64		ntids = 0;
85 	BlockNumber blkno = BLOOM_HEAD_BLKNO,
86 				npages;
87 	int			i;
88 	BufferAccessStrategy bas;
89 	BloomScanOpaque so = (BloomScanOpaque) scan->opaque;
90 
91 	if (so->sign == NULL)
92 	{
93 		/* New search: have to calculate search signature */
94 		ScanKey		skey = scan->keyData;
95 
96 		so->sign = palloc0(sizeof(BloomSignatureWord) * so->state.opts.bloomLength);
97 
98 		for (i = 0; i < scan->numberOfKeys; i++)
99 		{
100 			/*
101 			 * Assume bloom-indexable operators to be strict, so nothing could
102 			 * be found for NULL key.
103 			 */
104 			if (skey->sk_flags & SK_ISNULL)
105 			{
106 				pfree(so->sign);
107 				so->sign = NULL;
108 				return 0;
109 			}
110 
111 			/* Add next value to the signature */
112 			signValue(&so->state, so->sign, skey->sk_argument,
113 					  skey->sk_attno - 1);
114 
115 			skey++;
116 		}
117 	}
118 
119 	/*
120 	 * We're going to read the whole index. This is why we use appropriate
121 	 * buffer access strategy.
122 	 */
123 	bas = GetAccessStrategy(BAS_BULKREAD);
124 	npages = RelationGetNumberOfBlocks(scan->indexRelation);
125 
126 	for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
127 	{
128 		Buffer		buffer;
129 		Page		page;
130 
131 		buffer = ReadBufferExtended(scan->indexRelation, MAIN_FORKNUM,
132 									blkno, RBM_NORMAL, bas);
133 
134 		LockBuffer(buffer, BUFFER_LOCK_SHARE);
135 		page = BufferGetPage(buffer);
136 		TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
137 
138 		if (!PageIsNew(page) && !BloomPageIsDeleted(page))
139 		{
140 			OffsetNumber offset,
141 						maxOffset = BloomPageGetMaxOffset(page);
142 
143 			for (offset = 1; offset <= maxOffset; offset++)
144 			{
145 				BloomTuple *itup = BloomPageGetTuple(&so->state, page, offset);
146 				bool		res = true;
147 
148 				/* Check index signature with scan signature */
149 				for (i = 0; i < so->state.opts.bloomLength; i++)
150 				{
151 					if ((itup->sign[i] & so->sign[i]) != so->sign[i])
152 					{
153 						res = false;
154 						break;
155 					}
156 				}
157 
158 				/* Add matching tuples to bitmap */
159 				if (res)
160 				{
161 					tbm_add_tuples(tbm, &itup->heapPtr, 1, true);
162 					ntids++;
163 				}
164 			}
165 		}
166 
167 		UnlockReleaseBuffer(buffer);
168 		CHECK_FOR_INTERRUPTS();
169 	}
170 	FreeAccessStrategy(bas);
171 
172 	return ntids;
173 }
174