1 #define REDISMODULE_EXPERIMENTAL_API
2 #include "redismodule.h"
3
4 #include <string.h>
5 #include <assert.h>
6 #include <unistd.h>
7
8 #define LIST_SIZE 1024
9
10 typedef struct {
11 long long list[LIST_SIZE];
12 long long length;
13 } fsl_t; /* Fixed-size list */
14
15 static RedisModuleType *fsltype = NULL;
16
fsl_type_create()17 fsl_t *fsl_type_create() {
18 fsl_t *o;
19 o = RedisModule_Alloc(sizeof(*o));
20 o->length = 0;
21 return o;
22 }
23
fsl_type_free(fsl_t * o)24 void fsl_type_free(fsl_t *o) {
25 RedisModule_Free(o);
26 }
27
28 /* ========================== "fsltype" type methods ======================= */
29
fsl_rdb_load(RedisModuleIO * rdb,int encver)30 void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
31 if (encver != 0) {
32 return NULL;
33 }
34 fsl_t *fsl = fsl_type_create();
35 fsl->length = RedisModule_LoadUnsigned(rdb);
36 for (long long i = 0; i < fsl->length; i++)
37 fsl->list[i] = RedisModule_LoadSigned(rdb);
38 return fsl;
39 }
40
fsl_rdb_save(RedisModuleIO * rdb,void * value)41 void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
42 fsl_t *fsl = value;
43 RedisModule_SaveUnsigned(rdb,fsl->length);
44 for (long long i = 0; i < fsl->length; i++)
45 RedisModule_SaveSigned(rdb, fsl->list[i]);
46 }
47
fsl_aofrw(RedisModuleIO * aof,RedisModuleString * key,void * value)48 void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
49 fsl_t *fsl = value;
50 for (long long i = 0; i < fsl->length; i++)
51 RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
52 }
53
fsl_free(void * value)54 void fsl_free(void *value) {
55 fsl_type_free(value);
56 }
57
58 /* ========================== helper methods ======================= */
59
get_fsl(RedisModuleCtx * ctx,RedisModuleString * keyname,int mode,int create,fsl_t ** fsl,int reply_on_failure)60 int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
61 RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
62
63 int type = RedisModule_KeyType(key);
64 if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
65 RedisModule_CloseKey(key);
66 if (reply_on_failure)
67 RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
68 return 0;
69 }
70
71 /* Create an empty value object if the key is currently empty. */
72 if (type == REDISMODULE_KEYTYPE_EMPTY) {
73 if (!create) {
74 /* Key is empty but we cannot create */
75 RedisModule_CloseKey(key);
76 *fsl = NULL;
77 return 1;
78 }
79 *fsl = fsl_type_create();
80 RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
81 } else {
82 *fsl = RedisModule_ModuleTypeGetValue(key);
83 }
84
85 RedisModule_CloseKey(key);
86 return 1;
87 }
88
89 /* ========================== commands ======================= */
90
91 /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
92 * It must be greater than the element in the head of the list. */
fsl_push(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)93 int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
94 if (argc != 3)
95 return RedisModule_WrongArity(ctx);
96
97 long long ele;
98 if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
99 return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
100
101 fsl_t *fsl;
102 if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
103 return REDISMODULE_OK;
104
105 if (fsl->length == LIST_SIZE)
106 return RedisModule_ReplyWithError(ctx,"ERR list is full");
107
108 if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
109 return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
110
111 fsl->list[fsl->length++] = ele;
112 RedisModule_SignalKeyAsReady(ctx, argv[1]);
113
114 return RedisModule_ReplyWithSimpleString(ctx, "OK");
115 }
116
bpop_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)117 int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
118 REDISMODULE_NOT_USED(argv);
119 REDISMODULE_NOT_USED(argc);
120 RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
121
122 fsl_t *fsl;
123 if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
124 return REDISMODULE_ERR;
125
126 RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
127 return REDISMODULE_OK;
128 }
129
bpop_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)130 int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
131 REDISMODULE_NOT_USED(argv);
132 REDISMODULE_NOT_USED(argc);
133 return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
134 }
135
136 /* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
137 * When that happens, unblock client and pop the last two elements (from the right). */
fsl_bpop(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)138 int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
139 if (argc != 3)
140 return RedisModule_WrongArity(ctx);
141
142 long long timeout;
143 if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
144 return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
145
146 fsl_t *fsl;
147 if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
148 return REDISMODULE_OK;
149
150 if (!fsl) {
151 RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
152 NULL, timeout, &argv[1], 1, NULL);
153 } else {
154 RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
155 }
156
157 return REDISMODULE_OK;
158 }
159
bpopgt_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)160 int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
161 REDISMODULE_NOT_USED(argv);
162 REDISMODULE_NOT_USED(argc);
163 RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
164 long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
165
166 fsl_t *fsl;
167 if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
168 return REDISMODULE_ERR;
169
170 if (fsl->list[fsl->length-1] <= *pgt)
171 return REDISMODULE_ERR;
172
173 RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
174 return REDISMODULE_OK;
175 }
176
bpopgt_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)177 int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
178 REDISMODULE_NOT_USED(argv);
179 REDISMODULE_NOT_USED(argc);
180 return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
181 }
182
bpopgt_free_privdata(RedisModuleCtx * ctx,void * privdata)183 void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
184 REDISMODULE_NOT_USED(ctx);
185 RedisModule_Free(privdata);
186 }
187
188 /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
189 * When that happens, unblock client and pop the last element (from the right). */
fsl_bpopgt(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)190 int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
191 if (argc != 4)
192 return RedisModule_WrongArity(ctx);
193
194 long long gt;
195 if (RedisModule_StringToLongLong(argv[2],>) != REDISMODULE_OK)
196 return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
197
198 long long timeout;
199 if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
200 return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
201
202 fsl_t *fsl;
203 if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
204 return REDISMODULE_OK;
205
206 if (!fsl || fsl->list[fsl->length-1] <= gt) {
207 /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
208 long long *pgt = RedisModule_Alloc(sizeof(long long));
209 *pgt = gt;
210 RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
211 bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
212 } else {
213 RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
214 }
215
216 return REDISMODULE_OK;
217 }
218
bpoppush_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)219 int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
220 REDISMODULE_NOT_USED(argv);
221 REDISMODULE_NOT_USED(argc);
222 RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
223 RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
224
225 fsl_t *src;
226 if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
227 return REDISMODULE_ERR;
228
229 fsl_t *dst;
230 if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
231 return REDISMODULE_ERR;
232
233 long long ele = src->list[--src->length];
234 dst->list[dst->length++] = ele;
235 RedisModule_SignalKeyAsReady(ctx, dst_keyname);
236 return RedisModule_ReplyWithLongLong(ctx, ele);
237 }
238
bpoppush_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)239 int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
240 REDISMODULE_NOT_USED(argv);
241 REDISMODULE_NOT_USED(argc);
242 return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
243 }
244
bpoppush_free_privdata(RedisModuleCtx * ctx,void * privdata)245 void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
246 RedisModule_FreeString(ctx, privdata);
247 }
248
249 /* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
250 * When that happens, unblock client, pop the last element from <src> and push it to <dst>
251 * (from the right). */
fsl_bpoppush(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)252 int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
253 if (argc != 4)
254 return RedisModule_WrongArity(ctx);
255
256 long long timeout;
257 if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
258 return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
259
260 fsl_t *src;
261 if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
262 return REDISMODULE_OK;
263
264 if (!src) {
265 /* Retain string for reply callback */
266 RedisModule_RetainString(ctx, argv[2]);
267 /* Key is empty, we must block */
268 RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
269 bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
270 } else {
271 fsl_t *dst;
272 if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
273 return REDISMODULE_OK;
274 long long ele = src->list[--src->length];
275 dst->list[dst->length++] = ele;
276 RedisModule_SignalKeyAsReady(ctx, argv[2]);
277 RedisModule_ReplyWithLongLong(ctx, ele);
278 }
279
280 return REDISMODULE_OK;
281 }
282
283 /* FSL.GETALL <key> - Reply with an array containing all elements. */
fsl_getall(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)284 int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
285 if (argc != 2)
286 return RedisModule_WrongArity(ctx);
287
288 fsl_t *fsl;
289 if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
290 return REDISMODULE_OK;
291
292 if (!fsl)
293 return RedisModule_ReplyWithArray(ctx, 0);
294
295 RedisModule_ReplyWithArray(ctx, fsl->length);
296 for (int i = 0; i < fsl->length; i++)
297 RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
298 return REDISMODULE_OK;
299 }
300
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)301 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
302 REDISMODULE_NOT_USED(argv);
303 REDISMODULE_NOT_USED(argc);
304
305 if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
306 return REDISMODULE_ERR;
307
308 RedisModuleTypeMethods tm = {
309 .version = REDISMODULE_TYPE_METHOD_VERSION,
310 .rdb_load = fsl_rdb_load,
311 .rdb_save = fsl_rdb_save,
312 .aof_rewrite = fsl_aofrw,
313 .mem_usage = NULL,
314 .free = fsl_free,
315 .digest = NULL
316 };
317
318 fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
319 if (fsltype == NULL)
320 return REDISMODULE_ERR;
321
322 if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
323 return REDISMODULE_ERR;
324
325 if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
326 return REDISMODULE_ERR;
327
328 if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
329 return REDISMODULE_ERR;
330
331 if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
332 return REDISMODULE_ERR;
333
334 if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
335 return REDISMODULE_ERR;
336
337 return REDISMODULE_OK;
338 }
339