1 /* Helloblock module -- An example of blocking command implementation
2 * with threads.
3 *
4 * -----------------------------------------------------------------------------
5 *
6 * Copyright (c) 2016, Salvatore Sanfilippo <antirez at gmail dot com>
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * * Redistributions of source code must retain the above copyright notice,
13 * this list of conditions and the following disclaimer.
14 * * Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in the
16 * documentation and/or other materials provided with the distribution.
17 * * Neither the name of Redis nor the names of its contributors may be used
18 * to endorse or promote products derived from this software without
19 * specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31 * POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #define REDISMODULE_EXPERIMENTAL_API
35 #include "../redismodule.h"
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <pthread.h>
39 #include <unistd.h>
40
41 /* Reply callback for blocking command HELLO.BLOCK */
HelloBlock_Reply(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)42 int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
43 REDISMODULE_NOT_USED(argv);
44 REDISMODULE_NOT_USED(argc);
45 int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
46 return RedisModule_ReplyWithLongLong(ctx,*myint);
47 }
48
49 /* Timeout callback for blocking command HELLO.BLOCK */
HelloBlock_Timeout(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)50 int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
51 REDISMODULE_NOT_USED(argv);
52 REDISMODULE_NOT_USED(argc);
53 return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
54 }
55
56 /* Private data freeing callback for HELLO.BLOCK command. */
HelloBlock_FreeData(RedisModuleCtx * ctx,void * privdata)57 void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
58 REDISMODULE_NOT_USED(ctx);
59 RedisModule_Free(privdata);
60 }
61
62 /* The thread entry point that actually executes the blocking part
63 * of the command HELLO.BLOCK. */
HelloBlock_ThreadMain(void * arg)64 void *HelloBlock_ThreadMain(void *arg) {
65 void **targ = arg;
66 RedisModuleBlockedClient *bc = targ[0];
67 long long delay = (unsigned long)targ[1];
68 RedisModule_Free(targ);
69
70 sleep(delay);
71 int *r = RedisModule_Alloc(sizeof(int));
72 *r = rand();
73 RedisModule_UnblockClient(bc,r);
74 return NULL;
75 }
76
77 /* An example blocked client disconnection callback.
78 *
79 * Note that in the case of the HELLO.BLOCK command, the blocked client is now
80 * owned by the thread calling sleep(). In this specific case, there is not
81 * much we can do, however normally we could instead implement a way to
82 * signal the thread that the client disconnected, and sleep the specified
83 * amount of seconds with a while loop calling sleep(1), so that once we
84 * detect the client disconnection, we can terminate the thread ASAP. */
HelloBlock_Disconnected(RedisModuleCtx * ctx,RedisModuleBlockedClient * bc)85 void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
86 RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
87 (void*)bc);
88
89 /* Here you should cleanup your state / threads, and if possible
90 * call RedisModule_UnblockClient(), or notify the thread that will
91 * call the function ASAP. */
92 }
93
94 /* HELLO.BLOCK <delay> <timeout> -- Block for <count> seconds, then reply with
95 * a random number. Timeout is the command timeout, so that you can test
96 * what happens when the delay is greater than the timeout. */
HelloBlock_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)97 int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
98 if (argc != 3) return RedisModule_WrongArity(ctx);
99 long long delay;
100 long long timeout;
101
102 if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
103 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
104 }
105
106 if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
107 return RedisModule_ReplyWithError(ctx,"ERR invalid count");
108 }
109
110 pthread_t tid;
111 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
112
113 /* Here we set a disconnection handler, however since this module will
114 * block in sleep() in a thread, there is not much we can do in the
115 * callback, so this is just to show you the API. */
116 RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
117
118 /* Now that we setup a blocking client, we need to pass the control
119 * to the thread. However we need to pass arguments to the thread:
120 * the delay and a reference to the blocked client handle. */
121 void **targ = RedisModule_Alloc(sizeof(void*)*2);
122 targ[0] = bc;
123 targ[1] = (void*)(unsigned long) delay;
124
125 if (pthread_create(&tid,NULL,HelloBlock_ThreadMain,targ) != 0) {
126 RedisModule_AbortBlock(bc);
127 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
128 }
129 return REDISMODULE_OK;
130 }
131
132 /* The thread entry point that actually executes the blocking part
133 * of the command HELLO.KEYS.
134 *
135 * Note: this implementation is very simple on purpose, so no duplicated
136 * keys (returned by SCAN) are filtered. However adding such a functionality
137 * would be trivial just using any data structure implementing a dictionary
138 * in order to filter the duplicated items. */
HelloKeys_ThreadMain(void * arg)139 void *HelloKeys_ThreadMain(void *arg) {
140 RedisModuleBlockedClient *bc = arg;
141 RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
142 long long cursor = 0;
143 size_t replylen = 0;
144
145 RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_LEN);
146 do {
147 RedisModule_ThreadSafeContextLock(ctx);
148 RedisModuleCallReply *reply = RedisModule_Call(ctx,
149 "SCAN","l",(long long)cursor);
150 RedisModule_ThreadSafeContextUnlock(ctx);
151
152 RedisModuleCallReply *cr_cursor =
153 RedisModule_CallReplyArrayElement(reply,0);
154 RedisModuleCallReply *cr_keys =
155 RedisModule_CallReplyArrayElement(reply,1);
156
157 RedisModuleString *s = RedisModule_CreateStringFromCallReply(cr_cursor);
158 RedisModule_StringToLongLong(s,&cursor);
159 RedisModule_FreeString(ctx,s);
160
161 size_t items = RedisModule_CallReplyLength(cr_keys);
162 for (size_t j = 0; j < items; j++) {
163 RedisModuleCallReply *ele =
164 RedisModule_CallReplyArrayElement(cr_keys,j);
165 RedisModule_ReplyWithCallReply(ctx,ele);
166 replylen++;
167 }
168 RedisModule_FreeCallReply(reply);
169 } while (cursor != 0);
170 RedisModule_ReplySetArrayLength(ctx,replylen);
171
172 RedisModule_FreeThreadSafeContext(ctx);
173 RedisModule_UnblockClient(bc,NULL);
174 return NULL;
175 }
176
177 /* HELLO.KEYS -- Return all the keys in the current database without blocking
178 * the server. The keys do not represent a point-in-time state so only the keys
179 * that were in the database from the start to the end are guaranteed to be
180 * there. */
HelloKeys_RedisCommand(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)181 int HelloKeys_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
182 REDISMODULE_NOT_USED(argv);
183 if (argc != 1) return RedisModule_WrongArity(ctx);
184
185 pthread_t tid;
186
187 /* Note that when blocking the client we do not set any callback: no
188 * timeout is possible since we passed '0', nor we need a reply callback
189 * because we'll use the thread safe context to accumulate a reply. */
190 RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
191
192 /* Now that we setup a blocking client, we need to pass the control
193 * to the thread. However we need to pass arguments to the thread:
194 * the reference to the blocked client handle. */
195 if (pthread_create(&tid,NULL,HelloKeys_ThreadMain,bc) != 0) {
196 RedisModule_AbortBlock(bc);
197 return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
198 }
199 return REDISMODULE_OK;
200 }
201
202 /* This function must be present on each Redis module. It is used in order to
203 * register the commands into the Redis server. */
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)204 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
205 REDISMODULE_NOT_USED(argv);
206 REDISMODULE_NOT_USED(argc);
207
208 if (RedisModule_Init(ctx,"helloblock",1,REDISMODULE_APIVER_1)
209 == REDISMODULE_ERR) return REDISMODULE_ERR;
210
211 if (RedisModule_CreateCommand(ctx,"hello.block",
212 HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
213 return REDISMODULE_ERR;
214 if (RedisModule_CreateCommand(ctx,"hello.keys",
215 HelloKeys_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
216 return REDISMODULE_ERR;
217
218 return REDISMODULE_OK;
219 }
220