1 #define REDISMODULE_EXPERIMENTAL_API
2 #define _XOPEN_SOURCE 700
3 #include "redismodule.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <time.h>
8
9 #define UNUSED(x) (void)(x)
10
11 /* Reply callback for blocking command BLOCK.DEBUG */
HelloBlock_Reply(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)12 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
13 UNUSED(argv);
14 UNUSED(argc);
15 int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
16 return RedisModule_ReplyWithLongLong(ctx,*myint);
17 }
18
19 /* Timeout callback for blocking command BLOCK.DEBUG */
HelloBlock_Timeout(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)20 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
21 UNUSED(argv);
22 UNUSED(argc);
23 RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
24 RedisModule_BlockedClientMeasureTimeEnd(bc);
25 return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
26 }
27
28 /* Private data freeing callback for BLOCK.DEBUG command. */
HelloBlock_FreeData(RedisModuleCtx * ctx,void * privdata)29 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
30 UNUSED(ctx);
31 RedisModule_Free(privdata);
32 }
33
34 /* Private data freeing callback for BLOCK.BLOCK command. */
HelloBlock_FreeStringData(RedisModuleCtx * ctx,void * privdata)35 void HelloBlock_FreeStringData(RedisModuleCtx *ctx, void *privdata) {
36 RedisModule_FreeString(ctx, (RedisModuleString*)privdata);
37 }
38
39 /* The thread entry point that actually executes the blocking part
40 * of the command BLOCK.DEBUG. */
BlockDebug_ThreadMain(void * arg)41 void *BlockDebug_ThreadMain(void *arg) {
42 void **targ = arg;
43 RedisModuleBlockedClient *bc = targ[0];
44 long long delay = (unsigned long)targ[1];
45 long long enable_time_track = (unsigned long)targ[2];
46 if (enable_time_track)
47 RedisModule_BlockedClientMeasureTimeStart(bc);
48 RedisModule_Free(targ);
49
50 struct timespec ts;
51 ts.tv_sec = delay / 1000;
52 ts.tv_nsec = (delay % 1000) * 1000000;
53 nanosleep(&ts, NULL);
54 int *r = RedisModule_Alloc(sizeof(int));
55 *r = rand();
56 if (enable_time_track)
57 RedisModule_BlockedClientMeasureTimeEnd(bc);
58 RedisModule_UnblockClient(bc,r);
59 return NULL;
60 }
61
62 /* The thread entry point that actually executes the blocking part
63 * of the command BLOCK.DOUBLE_DEBUG. */
DoubleBlock_ThreadMain(void * arg)64 void *DoubleBlock_ThreadMain(void *arg) {
65 void **targ = arg;
66 RedisModuleBlockedClient *bc = targ[0];
67 long long delay = (unsigned long)targ[1];
68 RedisModule_BlockedClientMeasureTimeStart(bc);
69 RedisModule_Free(targ);
70 struct timespec ts;
71 ts.tv_sec = delay / 1000;
72 ts.tv_nsec = (delay % 1000) * 1000000;
73 nanosleep(&ts, NULL);
74 int *r = RedisModule_Alloc(sizeof(int));
75 *r = rand();
76 RedisModule_BlockedClientMeasureTimeEnd(bc);
77 /* call again RedisModule_BlockedClientMeasureTimeStart() and
78 * RedisModule_BlockedClientMeasureTimeEnd and ensure that the
79 * total execution time is 2x the delay. */
80 RedisModule_BlockedClientMeasureTimeStart(bc);
81 nanosleep(&ts, NULL);
82 RedisModule_BlockedClientMeasureTimeEnd(bc);
83
84 RedisModule_UnblockClient(bc,r);
85 return NULL;
86 }
87
HelloBlock_Disconnected(RedisModuleCtx * ctx,RedisModuleBlockedClient * bc)88 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
89 RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
90 (void*)bc);
91 }
92
93 /* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
94 * a random number. Timeout is the command timeout, so that you can test
95 * what happens when the delay is greater than the timeout. */
HelloBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)96 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
97 if (argc != 3) return RedisModule_WrongArity(ctx);
98 long long delay;
99 long long timeout;
100
101 if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
102 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
103 }
104
105 if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
106 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
107 }
108
109 pthread_t tid;
110 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
111
112 /* Here we set a disconnection handler, however since this module will
113 * block in sleep() in a thread, there is not much we can do in the
114 * callback, so this is just to show you the API. */
115 RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
116
117 /* Now that we setup a blocking client, we need to pass the control
118 * to the thread. However we need to pass arguments to the thread:
119 * the delay and a reference to the blocked client handle. */
120 void **targ = RedisModule_Alloc(sizeof(void*)*3);
121 targ[0] = bc;
122 targ[1] = (void*)(unsigned long) delay;
123 // pass 1 as flag to enable time tracking
124 targ[2] = (void*)(unsigned long) 1;
125
126 if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
127 RedisModule_AbortBlock(bc);
128 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
129 }
130 return REDISMODULE_OK;
131 }
132
133 /* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
134 * a random number. Timeout is the command timeout, so that you can test
135 * what happens when the delay is greater than the timeout.
136 * this command does not track background time so the background time should no appear in stats*/
HelloBlockNoTracking_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)137 int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
138 if (argc != 3) return RedisModule_WrongArity(ctx);
139 long long delay;
140 long long timeout;
141
142 if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
143 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
144 }
145
146 if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
147 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
148 }
149
150 pthread_t tid;
151 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
152
153 /* Here we set a disconnection handler, however since this module will
154 * block in sleep() in a thread, there is not much we can do in the
155 * callback, so this is just to show you the API. */
156 RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
157
158 /* Now that we setup a blocking client, we need to pass the control
159 * to the thread. However we need to pass arguments to the thread:
160 * the delay and a reference to the blocked client handle. */
161 void **targ = RedisModule_Alloc(sizeof(void*)*3);
162 targ[0] = bc;
163 targ[1] = (void*)(unsigned long) delay;
164 // pass 0 as flag to enable time tracking
165 targ[2] = (void*)(unsigned long) 0;
166
167 if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
168 RedisModule_AbortBlock(bc);
169 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
170 }
171 return REDISMODULE_OK;
172 }
173
174 /* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
175 * then reply with a random number.
176 * This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
177 * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
HelloDoubleBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)178 int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
179 if (argc != 2) return RedisModule_WrongArity(ctx);
180 long long delay;
181
182 if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
183 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
184 }
185
186 pthread_t tid;
187 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
188
189 /* Now that we setup a blocking client, we need to pass the control
190 * to the thread. However we need to pass arguments to the thread:
191 * the delay and a reference to the blocked client handle. */
192 void **targ = RedisModule_Alloc(sizeof(void*)*2);
193 targ[0] = bc;
194 targ[1] = (void*)(unsigned long) delay;
195
196 if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
197 RedisModule_AbortBlock(bc);
198 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
199 }
200 return REDISMODULE_OK;
201 }
202
203 RedisModuleBlockedClient *blocked_client = NULL;
204
205 /* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
206 * or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
207 * registered.
208 */
Block_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)209 int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
210 if (RedisModule_IsBlockedReplyRequest(ctx)) {
211 RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
212 return RedisModule_ReplyWithString(ctx, r);
213 } else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
214 RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
215 blocked_client = NULL;
216 return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
217 }
218
219 if (argc != 2) return RedisModule_WrongArity(ctx);
220 long long timeout;
221
222 if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
223 return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
224 }
225 if (blocked_client) {
226 return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
227 }
228
229 /* Block client. We use this function as both a reply and optional timeout
230 * callback and differentiate the different code flows above.
231 */
232 blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
233 timeout > 0 ? Block_RedisCommand : NULL, HelloBlock_FreeStringData, timeout);
234 return REDISMODULE_OK;
235 }
236
237 /* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
238 */
IsBlocked_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)239 int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
240 UNUSED(argv);
241 UNUSED(argc);
242 RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
243 return REDISMODULE_OK;
244 }
245
246 /* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
247 */
Release_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)248 int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
249 if (argc != 2) return RedisModule_WrongArity(ctx);
250 if (!blocked_client) {
251 return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
252 }
253
254 RedisModuleString *replystr = argv[1];
255 RedisModule_RetainString(ctx, replystr);
256 RedisModule_UnblockClient(blocked_client, replystr);
257 blocked_client = NULL;
258
259 RedisModule_ReplyWithSimpleString(ctx, "OK");
260
261 return REDISMODULE_OK;
262 }
263
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)264 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
265 UNUSED(argv);
266 UNUSED(argc);
267
268 if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
269 == REDISMODULE_ERR) return REDISMODULE_ERR;
270
271 if (RedisModule_CreateCommand(ctx,"block.debug",
272 HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
273 return REDISMODULE_ERR;
274
275 if (RedisModule_CreateCommand(ctx,"block.double_debug",
276 HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
277 return REDISMODULE_ERR;
278
279 if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
280 HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
281 return REDISMODULE_ERR;
282
283 if (RedisModule_CreateCommand(ctx, "block.block",
284 Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
285 return REDISMODULE_ERR;
286
287 if (RedisModule_CreateCommand(ctx,"block.is_blocked",
288 IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
289 return REDISMODULE_ERR;
290
291 if (RedisModule_CreateCommand(ctx,"block.release",
292 Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
293 return REDISMODULE_ERR;
294
295 return REDISMODULE_OK;
296 }
297