1 #ifndef RS_CONCERRNT_CTX_
2 #define RS_CONCERRNT_CTX_
3
4 #include "redisearch.h"
5 #include "redismodule.h"
6 #include "config.h"
7 #include <time.h>
8 #include <dep/thpool/thpool.h>
9
10 #if defined(__FreeBSD__) || defined(__DragonFly__)
11 #define CLOCK_MONOTONIC_RAW CLOCK_MONOTONIC
12 #endif
13
14 /** Concurrent Search Exection Context.
15 *
16 * We allow queries to run concurrently, each running on its own thread, locking the redis GIL
17 * for a bit, releasing it, and letting others run as well.
18 *
19 * The queries do not really run in parallel, but one at a time, competing over the global lock.
20 * This does not speed processing - in fact it can actually slow it down. But it prevents a
21 * common situation, where very slow queries block the entire redis instance for a long time.
22 *
23 * We intend to switch this model to a single thread running multiple "coroutines", but for now
24 * this naive implementation is good enough and will fix the search concurrency issue.
25 *
26 * The ConcurrentSearchCtx is part of a query, and the query calls the CONCURRENT_CTX_TICK macro
27 * for every "cycle" - meaning a processed search result. The concurrency engine will switch
28 * execution to another query when the current thread has spent enough time working.
29 *
30 * The current switch threshold is 200 microseconds. Since measuring time is slow in itself (~50ns)
31 * we sample the elapsed time every 20 "cycles" of the query processor.
32 *
33 */
34
35 typedef void (*ConcurrentReopenCallback)(void *ctx);
36
37 /* ConcurrentKeyCtx is a reference to a key that's being held open during concurrent execution and
38 * needs to be reopened after yielding and gaining back execution. See ConcurrentSearch_AddKey for
39 * more details */
40 typedef struct {
41 void *privdata;
42 ConcurrentReopenCallback cb;
43 // A custom callback to free privdata. If NULL we don't do anything
44 void (*freePrivData)(void *);
45 } ConcurrentKeyCtx;
46
47 /* The concurrent execution context struct itself. See above for details */
48 typedef struct {
49 long long ticker;
50 struct timespec lastTime;
51 RedisModuleCtx *ctx;
52 ConcurrentKeyCtx *openKeys;
53 uint32_t numOpenKeys;
54 uint32_t isLocked;
55 } ConcurrentSearchCtx;
56
57 /** The maximal size of the concurrent query thread pool. Since only one thread is operational at a
58 * time, it's not a problem besides memory consumption, to have much more threads than CPU cores.
59 * By default the pool starts with just one thread, and scales up as needed */
60
61 /**
62 * The maximum number of threads performing indexing on documents.
63 * It's good to set this to approximately the number of CPUs running.
64 *
65 * NOTE: This is merely the *fallback* value if for some reason the number of
66 * CPUs cannot be automatically determined. If you want to force the number
67 * of tokenizer threads, make sure you also disable the CPU detection in the
68 * source file
69 */
70
71 /** The number of execution "ticks" per elapsed time check. This is intended to reduce the number of
72 * calls to clock_gettime() */
73 #define CONCURRENT_TICK_CHECK 50
74
75 /** The timeout after which we try to switch to another query thread - in Nanoseconds */
76 #define CONCURRENT_TIMEOUT_NS 100000
77
78 /* Add a "monitored" key to the context. When keys are open during concurrent execution, they need
79 * to be closed before we yield execution and release the GIL, and reopened when we get back the
80 * execution context.
81 * To simplify this, each place in the program that holds a reference to a redis key
82 * based data, registers itself and the key to be automatically reopened.
83 *
84 * After reopening, a callback
85 * is being called to notify the key holder that it has been reopened, and handle the consequences.
86 * This is used by index iterators to avoid holding reference to deleted keys or changed data.
87 *
88 * We register the key, the flags to reopen it, a string holding its name for reopening, a callback
89 * for notification, and private callback data. if freePrivDataCallback is provided, we will call it
90 * when the context is freed to release the private data. If NULL is passed, we do nothing */
91 void ConcurrentSearch_AddKey(ConcurrentSearchCtx *ctx, ConcurrentReopenCallback cb,
92 void *privdata, void (*freePrivDataCallback)(void *));
93
94 /**
95 * Replace the key at a given position. The context must not be locked. It
96 * is assumed that the callback for the key remains the same
97 * - redisCtx is the redis module context which owns this key
98 * - keyName is the name of the new key
99 * - pos is the position at which the key resides (usually 0)
100 * - arg is the new arg to be passed to the callback
101 */
ConcurrentSearch_SetKey(ConcurrentSearchCtx * ctx,RedisModuleString * keyName,void * privdata)102 static inline void ConcurrentSearch_SetKey(ConcurrentSearchCtx *ctx, RedisModuleString *keyName,
103 void *privdata) {
104 ctx->openKeys[0].privdata = privdata;
105 }
106
107 /** Start the concurrent search thread pool. Should be called when initializing the module */
108 void ConcurrentSearch_ThreadPoolStart();
109 void ConcurrentSearch_ThreadPoolDestroy(void);
110
111 /* Create a new thread pool, and return its identifying id */
112 int ConcurrentSearch_CreatePool(int numThreads);
113
114 extern int CONCURRENT_POOL_INDEX;
115 extern int CONCURRENT_POOL_SEARCH;
116
117 /* Run a function on the concurrent thread pool */
118 void ConcurrentSearch_ThreadPoolRun(void (*func)(void *), void *arg, int type);
119
120 /** Check the elapsed timer, and release the lock if enough time has passed.
121 * Return 1 if switching took place
122 */
123 int ConcurrentSearch_CheckTimer(ConcurrentSearchCtx *ctx);
124
125 /** Initialize and reset a concurrent search ctx */
126 void ConcurrentSearchCtx_Init(RedisModuleCtx *rctx, ConcurrentSearchCtx *ctx);
127
128 /**
129 * Initialize a concurrent context to contain a single key. This key can be swapped
130 * out via SetKey()
131 */
132 void ConcurrentSearchCtx_InitSingle(ConcurrentSearchCtx *ctx, RedisModuleCtx *rctx, ConcurrentReopenCallback cb);
133
134 /** Reset the clock variables in the concurrent search context */
135 void ConcurrentSearchCtx_ResetClock(ConcurrentSearchCtx *ctx);
136
137 /* Free the execution context's dynamically allocated resources */
138 void ConcurrentSearchCtx_Free(ConcurrentSearchCtx *ctx);
139
140 void ConcurrentSearchCtx_Lock(ConcurrentSearchCtx *ctx);
141
142 void ConcurrentSearchCtx_Unlock(ConcurrentSearchCtx *ctx);
143
144 void ConcurrentSearchCtx_ReopenKeys(ConcurrentSearchCtx *ctx);
145
146 struct ConcurrentCmdCtx;
147 typedef void (*ConcurrentCmdHandler)(RedisModuleCtx *, RedisModuleString **, int,
148 struct ConcurrentCmdCtx *);
149
150 #define CMDCTX_KEEP_RCTX 0x01
151 #define CMDCTX_NO_GIL 0x02
152
153 /**
154 * Take ownership of the underlying Redis command context. Once ownership is
155 * claimed, the context needs to be freed (at some point in the future) via
156 * RM_FreeThreadSafeContext()
157 *
158 * TODO/FIXME:
159 * The context is tied to a BlockedCLient, but it shouldn't actually utilize it.
160 * Need to add an API to Redis to better manage a thread safe context, or to
161 * otherwise 'detach' it from the Client so that trying to perform I/O on it
162 * would result in an error rather than simply using a dangling pointer.
163 */
164 void ConcurrentCmdCtx_KeepRedisCtx(struct ConcurrentCmdCtx *ctx);
165
166 int ConcurrentSearch_HandleRedisCommand(int poolType, ConcurrentCmdHandler handler,
167 RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
168
169 /* Same as handleRedis command, but set flags for the concurrent context */
170 int ConcurrentSearch_HandleRedisCommandEx(int poolType, int options, ConcurrentCmdHandler handler,
171 RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
172
173 /** This macro is called by concurrent executors (currently the query only).
174 * It checks if enough time has passed and releases the global lock if that is the case.
175 */
176 #define CONCURRENT_CTX_TICK(x) \
177 ({ \
178 int conctx__didSwitch = 0; \
179 if ((x) && ++(x)->ticker % CONCURRENT_TICK_CHECK == 0) { \
180 if (ConcurrentSearch_CheckTimer((x))) { \
181 conctx__didSwitch = 1; \
182 } \
183 } \
184 conctx__didSwitch; \
185 })
186
187 // Check if the current request can be executed in a threadb
CheckConcurrentSupport(RedisModuleCtx * ctx)188 static inline int CheckConcurrentSupport(RedisModuleCtx *ctx) {
189 // See if this client should be concurrent
190 if (!RSGlobalConfig.concurrentMode) {
191 return 0;
192 }
193
194 // Redis cannot use blocked contexts in lua and/or multi commands. Concurrent
195 // search relies on blocking a client. In such cases, force non-concurrent
196 // search mode.
197 if (RedisModule_GetContextFlags && (RedisModule_GetContextFlags(ctx) &
198 (REDISMODULE_CTX_FLAGS_LUA | REDISMODULE_CTX_FLAGS_MULTI))) {
199 return 0;
200 }
201 return 1;
202 }
203
204 #endif
205