1 /* Helloblock module -- An example of blocking command implementation
2  * with threads.
3  *
4  * -----------------------------------------------------------------------------
5  *
6  * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  *   * Redistributions of source code must retain the above copyright notice,
13  *     this list of conditions and the following disclaimer.
14  *   * Redistributions in binary form must reproduce the above copyright
15  *     notice, this list of conditions and the following disclaimer in the
16  *     documentation and/or other materials provided with the distribution.
17  *   * Neither the name of Redis nor the names of its contributors may be used
18  *     to endorse or promote products derived from this software without
19  *     specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #define REDISMODULE_EXPERIMENTAL_API
35 #include "../redismodule.h"
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <pthread.h>
39 #include <unistd.h>
40 
41 /* Reply callback for blocking command HELLO.BLOCK */
HelloBlock_Reply(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)42 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
43     REDISMODULE_NOT_USED(argv);
44     REDISMODULE_NOT_USED(argc);
45     int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
46     return RedisModule_ReplyWithLongLong(ctx,*myint);
47 }
48 
49 /* Timeout callback for blocking command HELLO.BLOCK */
HelloBlock_Timeout(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)50 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
51     REDISMODULE_NOT_USED(argv);
52     REDISMODULE_NOT_USED(argc);
53     return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
54 }
55 
56 /* Private data freeing callback for HELLO.BLOCK command. */
HelloBlock_FreeData(RedisModuleCtx * ctx,void * privdata)57 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
58     REDISMODULE_NOT_USED(ctx);
59     RedisModule_Free(privdata);
60 }
61 
62 /* The thread entry point that actually executes the blocking part
63  * of the command HELLO.BLOCK. */
HelloBlock_ThreadMain(void * arg)64 void *HelloBlock_ThreadMain(void *arg) {
65     void **targ = arg;
66     RedisModuleBlockedClient *bc = targ[0];
67     long long delay = (unsigned long)targ[1];
68     RedisModule_Free(targ);
69 
70     sleep(delay);
71     int *r = RedisModule_Alloc(sizeof(int));
72     *r = rand();
73     RedisModule_UnblockClient(bc,r);
74     return NULL;
75 }
76 
77 /* An example blocked client disconnection callback.
78  *
79  * Note that in the case of the HELLO.BLOCK command, the blocked client is now
80  * owned by the thread calling sleep(). In this specific case, there is not
81  * much we can do, however normally we could instead implement a way to
82  * signal the thread that the client disconnected, and sleep the specified
83  * amount of seconds with a while loop calling sleep(1), so that once we
84  * detect the client disconnection, we can terminate the thread ASAP. */
HelloBlock_Disconnected(RedisModuleCtx * ctx,RedisModuleBlockedClient * bc)85 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
86     RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
87         (void*)bc);
88 
89     /* Here you should cleanup your state / threads, and if possible
90      * call RedisModule_UnblockClient(), or notify the thread that will
91      * call the function ASAP. */
92 }
93 
94 /* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
95  * a random number. Timeout is the command timeout, so that you can test
96  * what happens when the delay is greater than the timeout. */
HelloBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)97 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
98     if (argc != 3) return RedisModule_WrongArity(ctx);
99     long long delay;
100     long long timeout;
101 
102     if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
103         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
104     }
105 
106     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
107         return RedisModule_ReplyWithError(ctx,"ERR invalid count");
108     }
109 
110     pthread_t tid;
111     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
112 
113     /* Here we set a disconnection handler, however since this module will
114      * block in sleep() in a thread, there is not much we can do in the
115      * callback, so this is just to show you the API. */
116     RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
117 
118     /* Now that we setup a blocking client, we need to pass the control
119      * to the thread. However we need to pass arguments to the thread:
120      * the delay and a reference to the blocked client handle. */
121     void **targ = RedisModule_Alloc(sizeof(void*)*2);
122     targ[0] = bc;
123     targ[1] = (void*)(unsigned long) delay;
124 
125     if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) {
126         RedisModule_AbortBlock(bc);
127         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
128     }
129     return REDISMODULE_OK;
130 }
131 
132 /* The thread entry point that actually executes the blocking part
133  * of the command HELLO.KEYS.
134  *
135  * Note: this implementation is very simple on purpose, so no duplicated
136  * keys (returned by SCAN) are filtered. However adding such a functionality
137  * would be trivial just using any data structure implementing a dictionary
138  * in order to filter the duplicated items. */
HelloKeys_ThreadMain(void * arg)139 void *HelloKeys_ThreadMain(void *arg) {
140     RedisModuleBlockedClient *bc = arg;
141     RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
142     long long cursor = 0;
143     size_t replylen = 0;
144 
145     RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
146     do {
147         RedisModule_ThreadSafeContextLock(ctx);
148         RedisModuleCallReply *reply = RedisModule_Call(ctx,
149             "SCAN","l",(long long)cursor);
150         RedisModule_ThreadSafeContextUnlock(ctx);
151 
152         RedisModuleCallReply *cr_cursor =
153             RedisModule_CallReplyArrayElement(reply,0);
154         RedisModuleCallReply *cr_keys =
155             RedisModule_CallReplyArrayElement(reply,1);
156 
157         RedisModuleString *s = RedisModule_CreateStringFromCallReply(cr_cursor);
158         RedisModule_StringToLongLong(s,&cursor);
159         RedisModule_FreeString(ctx,s);
160 
161         size_t items = RedisModule_CallReplyLength(cr_keys);
162         for (size_t j = 0; j < items; j++) {
163             RedisModuleCallReply *ele =
164                 RedisModule_CallReplyArrayElement(cr_keys,j);
165             RedisModule_ReplyWithCallReply(ctx,ele);
166             replylen++;
167         }
168         RedisModule_FreeCallReply(reply);
169     } while (cursor != 0);
170     RedisModule_ReplySetArrayLength(ctx,replylen);
171 
172     RedisModule_FreeThreadSafeContext(ctx);
173     RedisModule_UnblockClient(bc,NULL);
174     return NULL;
175 }
176 
177 /* HELLO.KEYS -- Return all the keys in the current database without blocking
178  * the server. The keys do not represent a point-in-time state so only the keys
179  * that were in the database from the start to the end are guaranteed to be
180  * there. */
HelloKeys_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)181 int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
182     REDISMODULE_NOT_USED(argv);
183     if (argc != 1) return RedisModule_WrongArity(ctx);
184 
185     pthread_t tid;
186 
187     /* Note that when blocking the client we do not set any callback: no
188      * timeout is possible since we passed '0', nor we need a reply callback
189      * because we'll use the thread safe context to accumulate a reply. */
190     RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
191 
192     /* Now that we setup a blocking client, we need to pass the control
193      * to the thread. However we need to pass arguments to the thread:
194      * the reference to the blocked client handle. */
195     if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
196         RedisModule_AbortBlock(bc);
197         return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
198     }
199     return REDISMODULE_OK;
200 }
201 
202 /* This function must be present on each Redis module. It is used in order to
203  * register the commands into the Redis server. */
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)204 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
205     REDISMODULE_NOT_USED(argv);
206     REDISMODULE_NOT_USED(argc);
207 
208     if (RedisModule_Init(ctx,"helloblock",1,REDISMODULE_APIVER_1)
209         == REDISMODULE_ERR) return REDISMODULE_ERR;
210 
211     if (RedisModule_CreateCommand(ctx,"hello.block",
212         HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
213         return REDISMODULE_ERR;
214     if (RedisModule_CreateCommand(ctx,"hello.keys",
215         HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
216         return REDISMODULE_ERR;
217 
218     return REDISMODULE_OK;
219 }
220