1 #define REDISMODULE_EXPERIMENTAL_API
2 #define _XOPEN_SOURCE 700
3 #include "redismodule.h"
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <time.h>
8 
9 #define UNUSED(x) (void)(x)
10 
11 /* Reply callback for blocking command BLOCK.DEBUG */
HelloBlock_Reply(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)12 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
13     UNUSED(argv);
14     UNUSED(argc);
15     int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
16     return RedisModule_ReplyWithLongLong(ctx,*myint);
17 }
18 
19 /* Timeout callback for blocking command BLOCK.DEBUG */
HelloBlock_Timeout(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)20 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
21     UNUSED(argv);
22     UNUSED(argc);
23     RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
24     RedisModule_BlockedClientMeasureTimeEnd(bc);
25     return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
26 }
27 
28 /* Private data freeing callback for BLOCK.DEBUG command. */
HelloBlock_FreeData(RedisModuleCtx * ctx,void * privdata)29 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
30     UNUSED(ctx);
31     RedisModule_Free(privdata);
32 }
33 
34 /* Private data freeing callback for BLOCK.BLOCK command. */
HelloBlock_FreeStringData(RedisModuleCtx * ctx,void * privdata)35 void HelloBlock_FreeStringData(RedisModuleCtx *ctx, void *privdata) {
36     RedisModule_FreeString(ctx, (RedisModuleString*)privdata);
37 }
38 
39 /* The thread entry point that actually executes the blocking part
40  * of the command BLOCK.DEBUG. */
BlockDebug_ThreadMain(void * arg)41 void *BlockDebug_ThreadMain(void *arg) {
42     void **targ = arg;
43     RedisModuleBlockedClient *bc = targ[0];
44     long long delay = (unsigned long)targ[1];
45     long long enable_time_track = (unsigned long)targ[2];
46     if (enable_time_track)
47         RedisModule_BlockedClientMeasureTimeStart(bc);
48     RedisModule_Free(targ);
49 
50     struct timespec ts;
51     ts.tv_sec = delay / 1000;
52     ts.tv_nsec = (delay % 1000) * 1000000;
53     nanosleep(&ts, NULL);
54     int *r = RedisModule_Alloc(sizeof(int));
55     *r = rand();
56     if (enable_time_track)
57         RedisModule_BlockedClientMeasureTimeEnd(bc);
58     RedisModule_UnblockClient(bc,r);
59     return NULL;
60 }
61 
62 /* The thread entry point that actually executes the blocking part
63  * of the command BLOCK.DOUBLE_DEBUG. */
DoubleBlock_ThreadMain(void * arg)64 void *DoubleBlock_ThreadMain(void *arg) {
65     void **targ = arg;
66     RedisModuleBlockedClient *bc = targ[0];
67     long long delay = (unsigned long)targ[1];
68     RedisModule_BlockedClientMeasureTimeStart(bc);
69     RedisModule_Free(targ);
70     struct timespec ts;
71     ts.tv_sec = delay / 1000;
72     ts.tv_nsec = (delay % 1000) * 1000000;
73     nanosleep(&ts, NULL);
74     int *r = RedisModule_Alloc(sizeof(int));
75     *r = rand();
76     RedisModule_BlockedClientMeasureTimeEnd(bc);
77     /* call again RedisModule_BlockedClientMeasureTimeStart() and
78      * RedisModule_BlockedClientMeasureTimeEnd and ensure that the
79      * total execution time is 2x the delay. */
80     RedisModule_BlockedClientMeasureTimeStart(bc);
81     nanosleep(&ts, NULL);
82     RedisModule_BlockedClientMeasureTimeEnd(bc);
83 
84     RedisModule_UnblockClient(bc,r);
85     return NULL;
86 }
87 
HelloBlock_Disconnected(RedisModuleCtx * ctx,RedisModuleBlockedClient * bc)88 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
89     RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
90         (void*)bc);
91 }
92 
93 /* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
94  * a random number. Timeout is the command timeout, so that you can test
95  * what happens when the delay is greater than the timeout. */
HelloBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)96 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
97     if (argc != 3) return RedisModule_WrongArity(ctx);
98     long long delay;
99     long long timeout;
100 
101     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
102         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
103     }
104 
105     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
106         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
107     }
108 
109     pthread_t tid;
110     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
111 
112     /* Here we set a disconnection handler, however since this module will
113      * block in sleep() in a thread, there is not much we can do in the
114      * callback, so this is just to show you the API. */
115     RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
116 
117     /* Now that we setup a blocking client, we need to pass the control
118      * to the thread. However we need to pass arguments to the thread:
119      * the delay and a reference to the blocked client handle. */
120     void **targ = RedisModule_Alloc(sizeof(void*)*3);
121     targ[0] = bc;
122     targ[1] = (void*)(unsigned long) delay;
123     // pass 1 as flag to enable time tracking
124     targ[2] = (void*)(unsigned long) 1;
125 
126     if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
127         RedisModule_AbortBlock(bc);
128         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
129     }
130     return REDISMODULE_OK;
131 }
132 
133 /* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
134  * a random number. Timeout is the command timeout, so that you can test
135  * what happens when the delay is greater than the timeout.
136  * this command does not track background time so the background time should no appear in stats*/
HelloBlockNoTracking_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)137 int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
138     if (argc != 3) return RedisModule_WrongArity(ctx);
139     long long delay;
140     long long timeout;
141 
142     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
143         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
144     }
145 
146     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
147         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
148     }
149 
150     pthread_t tid;
151     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
152 
153     /* Here we set a disconnection handler, however since this module will
154      * block in sleep() in a thread, there is not much we can do in the
155      * callback, so this is just to show you the API. */
156     RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
157 
158     /* Now that we setup a blocking client, we need to pass the control
159      * to the thread. However we need to pass arguments to the thread:
160      * the delay and a reference to the blocked client handle. */
161     void **targ = RedisModule_Alloc(sizeof(void*)*3);
162     targ[0] = bc;
163     targ[1] = (void*)(unsigned long) delay;
164     // pass 0 as flag to enable time tracking
165     targ[2] = (void*)(unsigned long) 0;
166 
167     if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
168         RedisModule_AbortBlock(bc);
169         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
170     }
171     return REDISMODULE_OK;
172 }
173 
174 /* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
175  * then reply with a random number.
176  * This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
177  * and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
HelloDoubleBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)178 int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
179     if (argc != 2) return RedisModule_WrongArity(ctx);
180     long long delay;
181 
182     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
183         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
184     }
185 
186     pthread_t tid;
187     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
188 
189     /* Now that we setup a blocking client, we need to pass the control
190      * to the thread. However we need to pass arguments to the thread:
191      * the delay and a reference to the blocked client handle. */
192     void **targ = RedisModule_Alloc(sizeof(void*)*2);
193     targ[0] = bc;
194     targ[1] = (void*)(unsigned long) delay;
195 
196     if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
197         RedisModule_AbortBlock(bc);
198         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
199     }
200     return REDISMODULE_OK;
201 }
202 
203 RedisModuleBlockedClient *blocked_client = NULL;
204 
205 /* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
206  * or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
207  * registered.
208  */
Block_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)209 int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
210     if (RedisModule_IsBlockedReplyRequest(ctx)) {
211         RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
212         return RedisModule_ReplyWithString(ctx, r);
213     } else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
214         RedisModule_UnblockClient(blocked_client, NULL); /* Must be called to avoid leaks. */
215         blocked_client = NULL;
216         return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
217     }
218 
219     if (argc != 2) return RedisModule_WrongArity(ctx);
220     long long timeout;
221 
222     if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
223         return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
224     }
225     if (blocked_client) {
226         return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
227     }
228 
229     /* Block client. We use this function as both a reply and optional timeout
230      * callback and differentiate the different code flows above.
231      */
232     blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
233             timeout > 0 ? Block_RedisCommand : NULL, HelloBlock_FreeStringData, timeout);
234     return REDISMODULE_OK;
235 }
236 
237 /* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
238  */
IsBlocked_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)239 int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
240     UNUSED(argv);
241     UNUSED(argc);
242     RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
243     return REDISMODULE_OK;
244 }
245 
246 /* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
247  */
Release_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)248 int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
249     if (argc != 2) return RedisModule_WrongArity(ctx);
250     if (!blocked_client) {
251         return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
252     }
253 
254     RedisModuleString *replystr = argv[1];
255     RedisModule_RetainString(ctx, replystr);
256     RedisModule_UnblockClient(blocked_client, replystr);
257     blocked_client = NULL;
258 
259     RedisModule_ReplyWithSimpleString(ctx, "OK");
260 
261     return REDISMODULE_OK;
262 }
263 
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)264 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
265     UNUSED(argv);
266     UNUSED(argc);
267 
268     if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
269         == REDISMODULE_ERR) return REDISMODULE_ERR;
270 
271     if (RedisModule_CreateCommand(ctx,"block.debug",
272         HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
273         return REDISMODULE_ERR;
274 
275     if (RedisModule_CreateCommand(ctx,"block.double_debug",
276         HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
277         return REDISMODULE_ERR;
278 
279     if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
280         HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
281         return REDISMODULE_ERR;
282 
283     if (RedisModule_CreateCommand(ctx, "block.block",
284         Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
285         return REDISMODULE_ERR;
286 
287     if (RedisModule_CreateCommand(ctx,"block.is_blocked",
288         IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
289         return REDISMODULE_ERR;
290 
291     if (RedisModule_CreateCommand(ctx,"block.release",
292         Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
293         return REDISMODULE_ERR;
294 
295     return REDISMODULE_OK;
296 }
297