1 #ifndef RS_RESULT_PROCESSOR_H_
2 #define RS_RESULT_PROCESSOR_H_
3 
4 #include "redisearch.h"
5 #include "sortable.h"
6 #include "value.h"
7 #include "concurrent_ctx.h"
8 #include "search_ctx.h"
9 #include "index_iterator.h"
10 #include "search_options.h"
11 #include "rlookup.h"
12 #include "extension.h"
13 #include "score_explain.h"
14 
15 #ifdef __cplusplus
16 extern "C" {
17 #endif
18 /********************************************************************************
19  * Result Processor Chain
20  *
21  * We use a chain of result processors to sort, score, filter and page the results coming from the
22  * index.
23  *
24  * The index iterator tree is responsible for extracting results from the index, and the processor
25  * chain is responsible for processing those and preparing them for the users.
26  * The processors are exposing an iterator interface, adding values to SearchResult objects.
27  *
28  * SearchResult objects contain all the data needed for a search result - from docId and score, to
29  * the actual fields loaded from redis.
30  *
31  * Processors can add more fields, rewrite them, change the score, etc.
32  * The query plan builds the chain based on the request, and then the chain just processes the
33  * results.
34  *
35  ********************************************************************************/
36 
37 /* Query processing state */
38 typedef enum {
39   QITR_S_RUNNING,
40   QITR_S_ABORTED,
41 
42   // TimedOut state differs from aborted in that it lets the processors drain their accumulated
43   // results instead of stopping in our tracks and returning nothing.
44   QITR_S_TIMEDOUT
45 } QITRState;
46 
47 typedef enum {
48   RP_INDEX,
49   RP_LOADER,
50   RP_SCORER,
51   RP_SORTER,
52   RP_COUNTER,
53   RP_PAGER_LIMITER,
54   RP_HIGHLIGHTER,
55   RP_GROUP,
56   RP_PROJECTOR,
57   RP_FILTER,
58   RP_PROFILE,
59   RP_NETWORK,
60   RP_MAX,
61 } ResultProcessorType;
62 
63 struct ResultProcessor;
64 struct RLookup;
65 
66 typedef struct {
67   // First processor
68   struct ResultProcessor *rootProc;
69 
70   // Last processor
71   struct ResultProcessor *endProc;
72 
73   // Concurrent search context for thread switching
74   ConcurrentSearchCtx *conc;
75 
76   // Contains our spec
77   RedisSearchCtx *sctx;
78 
79   // the minimal score applicable for a result. It can be used to optimize the scorers
80   double minScore;
81 
82   // the total results found in the query, incremented by the root processors and decremented by
83   // others who might disqualify results
84   uint32_t totalResults;
85 
86   // Object which contains the error
87   QueryError *err;
88 
89   // the state - used for aborting queries
90   QITRState state;
91 
92   struct timespec startTime;
93 } QueryIterator, QueryProcessingCtx;
94 
95 IndexIterator *QITR_GetRootFilter(QueryIterator *it);
96 void QITR_PushRP(QueryIterator *it, struct ResultProcessor *rp);
97 void QITR_FreeChain(QueryIterator *qitr);
98 
99 /*
100  * SearchResult - the object all the processing chain is working on.
101  * It has the indexResult which is what the index scan brought - scores, vectors, flags, etc.
102  *
103  * And a list of fields loaded by the chain - currenly only by the loader, but possibly by
104  * aggregators later on
105  */
106 typedef struct {
107   t_docId docId;
108 
109   // not all results have score - TBD
110   double score;
111   RSScoreExplain *scoreExplain;
112 
113   RSDocumentMetadata *dmd;
114 
115   // index result should cover what you need for highlighting,
116   // but we will add a method to duplicate index results to make
117   // them thread safe
118   RSIndexResult *indexResult;
119 
120   // Row data. Use RLookup_* functions to access
121   RLookupRow rowdata;
122 } SearchResult;
123 
124 /* Result processor return codes */
125 
126 /** Possible return values from Next() */
127 typedef enum {
128   // Result is filled with valid data
129   RS_RESULT_OK = 0,
130   // Result is empty, and the last result has already been returned.
131   RS_RESULT_EOF,
132   // Execution paused due to rate limiting (or manual pause from ext. thread??)
133   RS_RESULT_PAUSED,
134   // Execution halted because of timeout
135   RS_RESULT_TIMEDOUT,
136   // Aborted because of error. The QueryState (parent->status) should have
137   // more information.
138   RS_RESULT_ERROR,
139   // Not a return code per se, but a marker signifying the end of the 'public'
140   // return codes. Implementations can use this for extensions.
141   RS_RESULT_MAX
142 } RPStatus;
143 
144 /**
145  * Result processor structure. This should be "Subclassed" by the actual
146  * implementations
147  */
148 typedef struct ResultProcessor {
149   // Reference to the parent structure
150   QueryIterator *parent;
151 
152   // Previous result processor in the chain
153   struct ResultProcessor *upstream;
154 
155   // Type of result processor
156   ResultProcessorType type;
157 
158   /**
159    * Populates the result pointed to by `res`. The existing data of `res` is
160    * not read, so it is the responsibility of the caller to ensure that there
161    * are no refcount leaks in the structure.
162    *
163    * Users can use SearchResult_Clear() to reset the structure without freeing
164    * it.
165    *
166    * The populated structure (if RS_RESULT_OK is returned) does contain references
167    * to document data. Callers *MUST* ensure they are eventually freed.
168    */
169   int (*Next)(struct ResultProcessor *self, SearchResult *res);
170 
171   /** Frees the processor and any internal data related to it. */
172   void (*Free)(struct ResultProcessor *self);
173 } ResultProcessor;
174 
175 // Get the index spec from the result processor
176 #define RP_SPEC(rpctx) ((rpctx)->parent->sctx->spec)
177 
178 /**
179  * This function resets the search result, so that it may be reused again.
180  * Internal caches are reset but not freed
181  */
182 void SearchResult_Clear(SearchResult *r);
183 
184 /**
185  * This function clears the search result, also freeing its internals. Internal
186  * caches are freed. Use this function if `r` will not be used again.
187  */
188 void SearchResult_Destroy(SearchResult *r);
189 
190 ResultProcessor *RPIndexIterator_New(IndexIterator *itr, struct timespec timeoutTime);
191 
192 ResultProcessor *RPScorer_New(const ExtScoringFunctionCtx *funcs,
193                               const ScoringFunctionArgs *fnargs);
194 
195 /** Functions abstracting the sortmap. Hides the bitwise logic */
196 #define SORTASCMAP_INIT 0xFFFFFFFFFFFFFFFF
197 #define SORTASCMAP_MAXFIELDS 8
198 #define SORTASCMAP_SETASC(mm, pos) ((mm) |= (1LLU << (pos)))
199 #define SORTASCMAP_SETDESC(mm, pos) ((mm) &= ~(1LLU << (pos)))
200 #define SORTASCMAP_GETASC(mm, pos) ((mm) & (1LLU << (pos)))
201 void SortAscMap_Dump(uint64_t v, size_t n);
202 
203 ResultProcessor *RPSorter_NewByFields(size_t maxresults, const RLookupKey **keys, size_t nkeys,
204                                       uint64_t ascendingMap);
205 
206 ResultProcessor *RPSorter_NewByScore(size_t maxresults);
207 
208 ResultProcessor *RPPager_New(size_t offset, size_t limit);
209 
210 /*******************************************************************************************************************
211  *  Loading Processor
212  *
213  * This processor simply takes the search results, and based on the request parameters, loads the
214  * relevant fields for the results that need to be displayed to the user, from redis.
215  *
216  * It fills the result objects' field map with values corresponding to the requested return fields
217  *
218  *******************************************************************************************************************/
219 ResultProcessor *RPLoader_New(RLookup *lk, const RLookupKey **keys, size_t nkeys);
220 
221 /** Creates a new Highlight processor */
222 ResultProcessor *RPHighlighter_New(const RSSearchOptions *searchopts, const FieldList *fields,
223                                    const RLookup *lookup);
224 
225 void RP_DumpChain(const ResultProcessor *rp);
226 
227 
228 /*******************************************************************************************************************
229  *  Profiling Processor
230  *
231  * This processor collects time and count info about the performance of its upstream RP.
232  *
233  *******************************************************************************************************************/
234 ResultProcessor *RPProfile_New(ResultProcessor *rp, QueryIterator *qiter);
235 
236 
237 /*******************************************************************************************************************
238  *  Counter Processor
239  *
240  * This processor counts the search results.
241  *
242  *******************************************************************************************************************/
243 ResultProcessor *RPCounter_New();
244 
245 /*****************************************
246  *            Timeout API
247  ****************************************/
248 
rs_timer_ge(struct timespec * a,struct timespec * b)249 static inline int rs_timer_ge(struct timespec *a, struct timespec *b) {
250   if (a->tv_sec == b->tv_sec) {
251     return a->tv_nsec >= b->tv_nsec;
252   }
253   return a->tv_sec >= b->tv_sec;
254 }
255 
rs_timeradd(struct timespec * a,struct timespec * b,struct timespec * result)256 static inline void rs_timeradd(struct timespec *a, struct timespec *b, struct timespec *result) {
257   result->tv_sec = a->tv_sec + b->tv_sec;
258   result->tv_nsec = a->tv_nsec + b->tv_nsec;
259   if (result->tv_nsec >= 1000000000) {
260     result->tv_sec  += 1;
261     result->tv_nsec -= 1000000000;
262   }
263 }
264 
rs_timersub(struct timespec * a,struct timespec * b,struct timespec * result)265 static inline void rs_timersub(struct timespec *a, struct timespec *b, struct timespec *result) {
266   result->tv_sec = a->tv_sec - b->tv_sec;
267   result->tv_nsec = a->tv_nsec - b->tv_nsec;
268   if (result->tv_nsec < 0) {
269     result->tv_sec  -= 1;
270     result->tv_nsec += 1000000000;
271   }
272 }
273 
TimedOut(struct timespec timeout)274 static inline int TimedOut(struct timespec timeout) {
275   // Check the elapsed processing time
276   static struct timespec now;
277   clock_gettime(CLOCK_MONOTONIC_RAW, &now);
278 
279   if (__builtin_expect(rs_timer_ge(&now, &timeout), 0)) {
280     return RS_RESULT_TIMEDOUT;
281   }
282   return RS_RESULT_OK;
283 }
284 
updateTimeout(struct timespec * timeout,int32_t durationNS)285 static inline void updateTimeout(struct timespec *timeout, int32_t durationNS) {
286   // 0 disables the timeout
287   if (durationNS == 0) {
288     durationNS = INT32_MAX;
289   }
290 
291   struct timespec now;
292   struct timespec duration = { .tv_sec = durationNS / 1000,
293                               .tv_nsec = ((durationNS % 1000) * 1000000) };
294   clock_gettime(CLOCK_MONOTONIC_RAW, &now);
295   rs_timeradd(&now, &duration, timeout);
296   //printf("sec %ld ms %ld\n", now.tv_sec, now.tv_nsec);
297 }
298 
299 void updateRPIndexTimeout(ResultProcessor *base, struct timespec timeout);
300 
301 clock_t RPProfile_GetClock(ResultProcessor *rp);
302 uint64_t RPProfile_GetCount(ResultProcessor *rp);
303 
304 // Return string for RPType
305 const char *RPTypeToString(ResultProcessorType type);
306 
307 #ifdef __cplusplus
308 }
309 #endif
310 #endif  // !RS_RESULT_PROCESSOR_H_
311