1 #ifndef RS_CONCERRNT_CTX_
2 #define RS_CONCERRNT_CTX_
3 
4 #include "redisearch.h"
5 #include "redismodule.h"
6 #include "config.h"
7 #include <time.h>
8 #include <dep/thpool/thpool.h>
9 
10 #if defined(__FreeBSD__) || defined(__DragonFly__)
11 #define CLOCK_MONOTONIC_RAW CLOCK_MONOTONIC
12 #endif
13 
14 /** Concurrent Search Exection Context.
15  *
16  * We allow queries to run concurrently, each running on its own thread, locking the redis GIL
17  * for a bit, releasing it, and letting others run as well.
18  *
19  * The queries do not really run in parallel, but one at a time, competing over the global lock.
20  * This does not speed processing - in fact it can actually slow it down. But it prevents a
21  * common situation, where very slow queries block the entire redis instance for a long time.
22  *
23  * We intend to switch this model to a single thread running multiple "coroutines", but for now
24  * this naive implementation is good enough and will fix the search concurrency issue.
25  *
26  * The ConcurrentSearchCtx is part of a query, and the query calls the CONCURRENT_CTX_TICK macro
27  * for every "cycle" - meaning a processed search result. The concurrency engine will switch
28  * execution to another query when the current thread has spent enough time working.
29  *
30  * The current switch threshold is 200 microseconds. Since measuring time is slow in itself (~50ns)
31  * we sample the elapsed time every 20 "cycles" of the query processor.
32  *
33  */
34 
35 typedef void (*ConcurrentReopenCallback)(void *ctx);
36 
37 /* ConcurrentKeyCtx is a reference to a key that's being held open during concurrent execution and
38  * needs to be reopened after yielding and gaining back execution. See ConcurrentSearch_AddKey for
39  * more details */
40 typedef struct {
41   void *privdata;
42   ConcurrentReopenCallback cb;
43   // A custom callback to free privdata. If NULL we don't do anything
44   void (*freePrivData)(void *);
45 } ConcurrentKeyCtx;
46 
47 /* The concurrent execution context struct itself. See above for details */
48 typedef struct {
49   long long ticker;
50   struct timespec lastTime;
51   RedisModuleCtx *ctx;
52   ConcurrentKeyCtx *openKeys;
53   uint32_t numOpenKeys;
54   uint32_t isLocked;
55 } ConcurrentSearchCtx;
56 
57 /** The maximal size of the concurrent query thread pool. Since only one thread is operational at a
58  * time, it's not a problem besides memory consumption, to have much more threads than CPU cores.
59  * By default the pool starts with just one thread, and scales up as needed  */
60 
61 /**
62  * The maximum number of threads performing indexing on documents.
63  * It's good to set this to approximately the number of CPUs running.
64  *
65  * NOTE: This is merely the *fallback* value if for some reason the number of
66  * CPUs cannot be automatically determined. If you want to force the number
67  * of tokenizer threads, make sure you also disable the CPU detection in the
68  * source file
69  */
70 
71 /** The number of execution "ticks" per elapsed time check. This is intended to reduce the number of
72  * calls to clock_gettime() */
73 #define CONCURRENT_TICK_CHECK 50
74 
75 /** The timeout after which we try to switch to another query thread - in Nanoseconds */
76 #define CONCURRENT_TIMEOUT_NS 100000
77 
78 /* Add a "monitored" key to the context. When keys are open during concurrent execution, they need
79  * to be closed before we yield execution and release the GIL, and reopened when we get back the
80  * execution context.
81  * To simplify this, each place in the program that holds a reference to a redis key
82  * based data, registers itself and the key to be automatically reopened.
83  *
84  * After reopening, a callback
85  * is being called to notify the key holder that it has been reopened, and handle the consequences.
86  * This is used by index iterators to avoid holding reference to deleted keys or changed data.
87  *
88  * We register the key, the flags to reopen it, a string holding its name for reopening, a callback
89  * for notification, and private callback data. if freePrivDataCallback is provided, we will call it
90  * when the context is freed to release the private data. If NULL is passed, we do nothing */
91 void ConcurrentSearch_AddKey(ConcurrentSearchCtx *ctx, ConcurrentReopenCallback cb,
92                              void *privdata, void (*freePrivDataCallback)(void *));
93 
94 /**
95  * Replace the key at a given position. The context must not be locked. It
96  * is assumed that the callback for the key remains the same
97  * - redisCtx is the redis module context which owns this key
98  * - keyName is the name of the new key
99  * - pos is the position at which the key resides (usually 0)
100  * - arg is the new arg to be passed to the callback
101  */
ConcurrentSearch_SetKey(ConcurrentSearchCtx * ctx,RedisModuleString * keyName,void * privdata)102 static inline void ConcurrentSearch_SetKey(ConcurrentSearchCtx *ctx, RedisModuleString *keyName,
103                                            void *privdata) {
104   ctx->openKeys[0].privdata = privdata;
105 }
106 
107 /** Start the concurrent search thread pool. Should be called when initializing the module */
108 void ConcurrentSearch_ThreadPoolStart();
109 void ConcurrentSearch_ThreadPoolDestroy(void);
110 
111 /* Create a new thread pool, and return its identifying id */
112 int ConcurrentSearch_CreatePool(int numThreads);
113 
114 extern int CONCURRENT_POOL_INDEX;
115 extern int CONCURRENT_POOL_SEARCH;
116 
117 /* Run a function on the concurrent thread pool */
118 void ConcurrentSearch_ThreadPoolRun(void (*func)(void *), void *arg, int type);
119 
120 /** Check the elapsed timer, and release the lock if enough time has passed.
121  * Return 1 if switching took place
122  */
123 int ConcurrentSearch_CheckTimer(ConcurrentSearchCtx *ctx);
124 
125 /** Initialize and reset a concurrent search ctx */
126 void ConcurrentSearchCtx_Init(RedisModuleCtx *rctx, ConcurrentSearchCtx *ctx);
127 
128 /**
129  * Initialize a concurrent context to contain a single key. This key can be swapped
130  * out via SetKey()
131  */
132 void ConcurrentSearchCtx_InitSingle(ConcurrentSearchCtx *ctx, RedisModuleCtx *rctx, ConcurrentReopenCallback cb);
133 
134 /** Reset the clock variables in the concurrent search context */
135 void ConcurrentSearchCtx_ResetClock(ConcurrentSearchCtx *ctx);
136 
137 /* Free the execution context's dynamically allocated resources */
138 void ConcurrentSearchCtx_Free(ConcurrentSearchCtx *ctx);
139 
140 void ConcurrentSearchCtx_Lock(ConcurrentSearchCtx *ctx);
141 
142 void ConcurrentSearchCtx_Unlock(ConcurrentSearchCtx *ctx);
143 
144 void ConcurrentSearchCtx_ReopenKeys(ConcurrentSearchCtx *ctx);
145 
146 struct ConcurrentCmdCtx;
147 typedef void (*ConcurrentCmdHandler)(RedisModuleCtx *, RedisModuleString **, int,
148                                      struct ConcurrentCmdCtx *);
149 
150 #define CMDCTX_KEEP_RCTX 0x01
151 #define CMDCTX_NO_GIL 0x02
152 
153 /**
154  * Take ownership of the underlying Redis command context. Once ownership is
155  * claimed, the context needs to be freed (at some point in the future) via
156  * RM_FreeThreadSafeContext()
157  *
158  * TODO/FIXME:
159  * The context is tied to a BlockedCLient, but it shouldn't actually utilize it.
160  * Need to add an API to Redis to better manage a thread safe context, or to
161  * otherwise 'detach' it from the Client so that trying to perform I/O on it
162  * would result in an error rather than simply using a dangling pointer.
163  */
164 void ConcurrentCmdCtx_KeepRedisCtx(struct ConcurrentCmdCtx *ctx);
165 
166 int ConcurrentSearch_HandleRedisCommand(int poolType, ConcurrentCmdHandler handler,
167                                         RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
168 
169 /* Same as handleRedis command, but set flags for the concurrent context */
170 int ConcurrentSearch_HandleRedisCommandEx(int poolType, int options, ConcurrentCmdHandler handler,
171                                           RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
172 
173 /** This macro is called by concurrent executors (currently the query only).
174  * It checks if enough time has passed and releases the global lock if that is the case.
175  */
176 #define CONCURRENT_CTX_TICK(x)                               \
177   ({                                                         \
178     int conctx__didSwitch = 0;                               \
179     if ((x) && ++(x)->ticker % CONCURRENT_TICK_CHECK == 0) { \
180       if (ConcurrentSearch_CheckTimer((x))) {                \
181         conctx__didSwitch = 1;                               \
182       }                                                      \
183     }                                                        \
184     conctx__didSwitch;                                       \
185   })
186 
187 // Check if the current request can be executed in a threadb
CheckConcurrentSupport(RedisModuleCtx * ctx)188 static inline int CheckConcurrentSupport(RedisModuleCtx *ctx) {
189   // See if this client should be concurrent
190   if (!RSGlobalConfig.concurrentMode) {
191     return 0;
192   }
193 
194   // Redis cannot use blocked contexts in lua and/or multi commands. Concurrent
195   // search relies on blocking a client. In such cases, force non-concurrent
196   // search mode.
197   if (RedisModule_GetContextFlags && (RedisModule_GetContextFlags(ctx) &
198                                       (REDISMODULE_CTX_FLAGS_LUA | REDISMODULE_CTX_FLAGS_MULTI))) {
199     return 0;
200   }
201   return 1;
202 }
203 
204 #endif
205