1 #ifndef RS_RESULT_PROCESSOR_H_
2 #define RS_RESULT_PROCESSOR_H_
3
4 #include "redisearch.h"
5 #include "sortable.h"
6 #include "value.h"
7 #include "concurrent_ctx.h"
8 #include "search_ctx.h"
9 #include "index_iterator.h"
10 #include "search_options.h"
11 #include "rlookup.h"
12 #include "extension.h"
13 #include "score_explain.h"
14
15 #ifdef __cplusplus
16 extern "C" {
17 #endif
18 /********************************************************************************
19 * Result Processor Chain
20 *
21 * We use a chain of result processors to sort, score, filter and page the results coming from the
22 * index.
23 *
24 * The index iterator tree is responsible for extracting results from the index, and the processor
25 * chain is responsible for processing those and preparing them for the users.
26 * The processors are exposing an iterator interface, adding values to SearchResult objects.
27 *
28 * SearchResult objects contain all the data needed for a search result - from docId and score, to
29 * the actual fields loaded from redis.
30 *
31 * Processors can add more fields, rewrite them, change the score, etc.
32 * The query plan builds the chain based on the request, and then the chain just processes the
33 * results.
34 *
35 ********************************************************************************/
36
37 /* Query processing state */
38 typedef enum {
39 QITR_S_RUNNING,
40 QITR_S_ABORTED,
41
42 // TimedOut state differs from aborted in that it lets the processors drain their accumulated
43 // results instead of stopping in our tracks and returning nothing.
44 QITR_S_TIMEDOUT
45 } QITRState;
46
47 typedef enum {
48 RP_INDEX,
49 RP_LOADER,
50 RP_SCORER,
51 RP_SORTER,
52 RP_COUNTER,
53 RP_PAGER_LIMITER,
54 RP_HIGHLIGHTER,
55 RP_GROUP,
56 RP_PROJECTOR,
57 RP_FILTER,
58 RP_PROFILE,
59 RP_NETWORK,
60 RP_MAX,
61 } ResultProcessorType;
62
63 struct ResultProcessor;
64 struct RLookup;
65
66 typedef struct {
67 // First processor
68 struct ResultProcessor *rootProc;
69
70 // Last processor
71 struct ResultProcessor *endProc;
72
73 // Concurrent search context for thread switching
74 ConcurrentSearchCtx *conc;
75
76 // Contains our spec
77 RedisSearchCtx *sctx;
78
79 // the minimal score applicable for a result. It can be used to optimize the scorers
80 double minScore;
81
82 // the total results found in the query, incremented by the root processors and decremented by
83 // others who might disqualify results
84 uint32_t totalResults;
85
86 // Object which contains the error
87 QueryError *err;
88
89 // the state - used for aborting queries
90 QITRState state;
91
92 struct timespec startTime;
93 } QueryIterator, QueryProcessingCtx;
94
95 IndexIterator *QITR_GetRootFilter(QueryIterator *it);
96 void QITR_PushRP(QueryIterator *it, struct ResultProcessor *rp);
97 void QITR_FreeChain(QueryIterator *qitr);
98
99 /*
100 * SearchResult - the object all the processing chain is working on.
101 * It has the indexResult which is what the index scan brought - scores, vectors, flags, etc.
102 *
103 * And a list of fields loaded by the chain - currenly only by the loader, but possibly by
104 * aggregators later on
105 */
106 typedef struct {
107 t_docId docId;
108
109 // not all results have score - TBD
110 double score;
111 RSScoreExplain *scoreExplain;
112
113 RSDocumentMetadata *dmd;
114
115 // index result should cover what you need for highlighting,
116 // but we will add a method to duplicate index results to make
117 // them thread safe
118 RSIndexResult *indexResult;
119
120 // Row data. Use RLookup_* functions to access
121 RLookupRow rowdata;
122 } SearchResult;
123
124 /* Result processor return codes */
125
126 /** Possible return values from Next() */
127 typedef enum {
128 // Result is filled with valid data
129 RS_RESULT_OK = 0,
130 // Result is empty, and the last result has already been returned.
131 RS_RESULT_EOF,
132 // Execution paused due to rate limiting (or manual pause from ext. thread??)
133 RS_RESULT_PAUSED,
134 // Execution halted because of timeout
135 RS_RESULT_TIMEDOUT,
136 // Aborted because of error. The QueryState (parent->status) should have
137 // more information.
138 RS_RESULT_ERROR,
139 // Not a return code per se, but a marker signifying the end of the 'public'
140 // return codes. Implementations can use this for extensions.
141 RS_RESULT_MAX
142 } RPStatus;
143
144 /**
145 * Result processor structure. This should be "Subclassed" by the actual
146 * implementations
147 */
148 typedef struct ResultProcessor {
149 // Reference to the parent structure
150 QueryIterator *parent;
151
152 // Previous result processor in the chain
153 struct ResultProcessor *upstream;
154
155 // Type of result processor
156 ResultProcessorType type;
157
158 /**
159 * Populates the result pointed to by `res`. The existing data of `res` is
160 * not read, so it is the responsibility of the caller to ensure that there
161 * are no refcount leaks in the structure.
162 *
163 * Users can use SearchResult_Clear() to reset the structure without freeing
164 * it.
165 *
166 * The populated structure (if RS_RESULT_OK is returned) does contain references
167 * to document data. Callers *MUST* ensure they are eventually freed.
168 */
169 int (*Next)(struct ResultProcessor *self, SearchResult *res);
170
171 /** Frees the processor and any internal data related to it. */
172 void (*Free)(struct ResultProcessor *self);
173 } ResultProcessor;
174
175 // Get the index spec from the result processor
176 #define RP_SPEC(rpctx) ((rpctx)->parent->sctx->spec)
177
178 /**
179 * This function resets the search result, so that it may be reused again.
180 * Internal caches are reset but not freed
181 */
182 void SearchResult_Clear(SearchResult *r);
183
184 /**
185 * This function clears the search result, also freeing its internals. Internal
186 * caches are freed. Use this function if `r` will not be used again.
187 */
188 void SearchResult_Destroy(SearchResult *r);
189
190 ResultProcessor *RPIndexIterator_New(IndexIterator *itr, struct timespec timeoutTime);
191
192 ResultProcessor *RPScorer_New(const ExtScoringFunctionCtx *funcs,
193 const ScoringFunctionArgs *fnargs);
194
195 /** Functions abstracting the sortmap. Hides the bitwise logic */
196 #define SORTASCMAP_INIT 0xFFFFFFFFFFFFFFFF
197 #define SORTASCMAP_MAXFIELDS 8
198 #define SORTASCMAP_SETASC(mm, pos) ((mm) |= (1LLU << (pos)))
199 #define SORTASCMAP_SETDESC(mm, pos) ((mm) &= ~(1LLU << (pos)))
200 #define SORTASCMAP_GETASC(mm, pos) ((mm) & (1LLU << (pos)))
201 void SortAscMap_Dump(uint64_t v, size_t n);
202
203 ResultProcessor *RPSorter_NewByFields(size_t maxresults, const RLookupKey **keys, size_t nkeys,
204 uint64_t ascendingMap);
205
206 ResultProcessor *RPSorter_NewByScore(size_t maxresults);
207
208 ResultProcessor *RPPager_New(size_t offset, size_t limit);
209
210 /*******************************************************************************************************************
211 * Loading Processor
212 *
213 * This processor simply takes the search results, and based on the request parameters, loads the
214 * relevant fields for the results that need to be displayed to the user, from redis.
215 *
216 * It fills the result objects' field map with values corresponding to the requested return fields
217 *
218 *******************************************************************************************************************/
219 ResultProcessor *RPLoader_New(RLookup *lk, const RLookupKey **keys, size_t nkeys);
220
221 /** Creates a new Highlight processor */
222 ResultProcessor *RPHighlighter_New(const RSSearchOptions *searchopts, const FieldList *fields,
223 const RLookup *lookup);
224
225 void RP_DumpChain(const ResultProcessor *rp);
226
227
228 /*******************************************************************************************************************
229 * Profiling Processor
230 *
231 * This processor collects time and count info about the performance of its upstream RP.
232 *
233 *******************************************************************************************************************/
234 ResultProcessor *RPProfile_New(ResultProcessor *rp, QueryIterator *qiter);
235
236
237 /*******************************************************************************************************************
238 * Counter Processor
239 *
240 * This processor counts the search results.
241 *
242 *******************************************************************************************************************/
243 ResultProcessor *RPCounter_New();
244
245 /*****************************************
246 * Timeout API
247 ****************************************/
248
rs_timer_ge(struct timespec * a,struct timespec * b)249 static inline int rs_timer_ge(struct timespec *a, struct timespec *b) {
250 if (a->tv_sec == b->tv_sec) {
251 return a->tv_nsec >= b->tv_nsec;
252 }
253 return a->tv_sec >= b->tv_sec;
254 }
255
rs_timeradd(struct timespec * a,struct timespec * b,struct timespec * result)256 static inline void rs_timeradd(struct timespec *a, struct timespec *b, struct timespec *result) {
257 result->tv_sec = a->tv_sec + b->tv_sec;
258 result->tv_nsec = a->tv_nsec + b->tv_nsec;
259 if (result->tv_nsec >= 1000000000) {
260 result->tv_sec += 1;
261 result->tv_nsec -= 1000000000;
262 }
263 }
264
rs_timersub(struct timespec * a,struct timespec * b,struct timespec * result)265 static inline void rs_timersub(struct timespec *a, struct timespec *b, struct timespec *result) {
266 result->tv_sec = a->tv_sec - b->tv_sec;
267 result->tv_nsec = a->tv_nsec - b->tv_nsec;
268 if (result->tv_nsec < 0) {
269 result->tv_sec -= 1;
270 result->tv_nsec += 1000000000;
271 }
272 }
273
TimedOut(struct timespec timeout)274 static inline int TimedOut(struct timespec timeout) {
275 // Check the elapsed processing time
276 static struct timespec now;
277 clock_gettime(CLOCK_MONOTONIC_RAW, &now);
278
279 if (__builtin_expect(rs_timer_ge(&now, &timeout), 0)) {
280 return RS_RESULT_TIMEDOUT;
281 }
282 return RS_RESULT_OK;
283 }
284
updateTimeout(struct timespec * timeout,int32_t durationNS)285 static inline void updateTimeout(struct timespec *timeout, int32_t durationNS) {
286 // 0 disables the timeout
287 if (durationNS == 0) {
288 durationNS = INT32_MAX;
289 }
290
291 struct timespec now;
292 struct timespec duration = { .tv_sec = durationNS / 1000,
293 .tv_nsec = ((durationNS % 1000) * 1000000) };
294 clock_gettime(CLOCK_MONOTONIC_RAW, &now);
295 rs_timeradd(&now, &duration, timeout);
296 //printf("sec %ld ms %ld\n", now.tv_sec, now.tv_nsec);
297 }
298
299 void updateRPIndexTimeout(ResultProcessor *base, struct timespec timeout);
300
301 clock_t RPProfile_GetClock(ResultProcessor *rp);
302 uint64_t RPProfile_GetCount(ResultProcessor *rp);
303
304 // Return string for RPType
305 const char *RPTypeToString(ResultProcessorType type);
306
307 #ifdef __cplusplus
308 }
309 #endif
310 #endif // !RS_RESULT_PROCESSOR_H_
311