1 #define REDISMODULE_EXPERIMENTAL_API
2 #include "redismodule.h"
3 
4 #include <string.h>
5 #include <assert.h>
6 #include <unistd.h>
7 
8 #define LIST_SIZE 1024
9 
10 typedef struct {
11     long long list[LIST_SIZE];
12     long long length;
13 } fsl_t; /* Fixed-size list */
14 
15 static RedisModuleType *fsltype = NULL;
16 
fsl_type_create()17 fsl_t *fsl_type_create() {
18     fsl_t *o;
19     o = RedisModule_Alloc(sizeof(*o));
20     o->length = 0;
21     return o;
22 }
23 
fsl_type_free(fsl_t * o)24 void fsl_type_free(fsl_t *o) {
25     RedisModule_Free(o);
26 }
27 
28 /* ========================== "fsltype" type methods ======================= */
29 
fsl_rdb_load(RedisModuleIO * rdb,int encver)30 void *fsl_rdb_load(RedisModuleIO *rdb, int encver) {
31     if (encver != 0) {
32         return NULL;
33     }
34     fsl_t *fsl = fsl_type_create();
35     fsl->length = RedisModule_LoadUnsigned(rdb);
36     for (long long i = 0; i < fsl->length; i++)
37         fsl->list[i] = RedisModule_LoadSigned(rdb);
38     return fsl;
39 }
40 
fsl_rdb_save(RedisModuleIO * rdb,void * value)41 void fsl_rdb_save(RedisModuleIO *rdb, void *value) {
42     fsl_t *fsl = value;
43     RedisModule_SaveUnsigned(rdb,fsl->length);
44     for (long long i = 0; i < fsl->length; i++)
45         RedisModule_SaveSigned(rdb, fsl->list[i]);
46 }
47 
fsl_aofrw(RedisModuleIO * aof,RedisModuleString * key,void * value)48 void fsl_aofrw(RedisModuleIO *aof, RedisModuleString *key, void *value) {
49     fsl_t *fsl = value;
50     for (long long i = 0; i < fsl->length; i++)
51         RedisModule_EmitAOF(aof, "FSL.PUSH","sl", key, fsl->list[i]);
52 }
53 
fsl_free(void * value)54 void fsl_free(void *value) {
55     fsl_type_free(value);
56 }
57 
58 /* ========================== helper methods ======================= */
59 
get_fsl(RedisModuleCtx * ctx,RedisModuleString * keyname,int mode,int create,fsl_t ** fsl,int reply_on_failure)60 int get_fsl(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode, int create, fsl_t **fsl, int reply_on_failure) {
61     RedisModuleKey *key = RedisModule_OpenKey(ctx, keyname, mode);
62 
63     int type = RedisModule_KeyType(key);
64     if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != fsltype) {
65         RedisModule_CloseKey(key);
66         if (reply_on_failure)
67             RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
68         return 0;
69     }
70 
71     /* Create an empty value object if the key is currently empty. */
72     if (type == REDISMODULE_KEYTYPE_EMPTY) {
73         if (!create) {
74             /* Key is empty but we cannot create */
75             RedisModule_CloseKey(key);
76             *fsl = NULL;
77             return 1;
78         }
79         *fsl = fsl_type_create();
80         RedisModule_ModuleTypeSetValue(key, fsltype, *fsl);
81     } else {
82         *fsl = RedisModule_ModuleTypeGetValue(key);
83     }
84 
85     RedisModule_CloseKey(key);
86     return 1;
87 }
88 
89 /* ========================== commands ======================= */
90 
91 /* FSL.PUSH <key> <int> - Push an integer to the fixed-size list (to the right).
92  * It must be greater than the element in the head of the list. */
fsl_push(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)93 int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
94     if (argc != 3)
95         return RedisModule_WrongArity(ctx);
96 
97     long long ele;
98     if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
99         return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
100 
101     fsl_t *fsl;
102     if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
103         return REDISMODULE_OK;
104 
105     if (fsl->length == LIST_SIZE)
106         return RedisModule_ReplyWithError(ctx,"ERR list is full");
107 
108     if (fsl->length != 0 && fsl->list[fsl->length-1] >= ele)
109         return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element");
110 
111     fsl->list[fsl->length++] = ele;
112     RedisModule_SignalKeyAsReady(ctx, argv[1]);
113 
114     return RedisModule_ReplyWithSimpleString(ctx, "OK");
115 }
116 
bpop_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)117 int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
118     REDISMODULE_NOT_USED(argv);
119     REDISMODULE_NOT_USED(argc);
120     RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
121 
122     fsl_t *fsl;
123     if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
124         return REDISMODULE_ERR;
125 
126     RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
127     return REDISMODULE_OK;
128 }
129 
bpop_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)130 int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
131     REDISMODULE_NOT_USED(argv);
132     REDISMODULE_NOT_USED(argc);
133     return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
134 }
135 
136 /* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
137  * When that happens, unblock client and pop the last two elements (from the right). */
fsl_bpop(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)138 int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
139     if (argc != 3)
140         return RedisModule_WrongArity(ctx);
141 
142     long long timeout;
143     if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
144         return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
145 
146     fsl_t *fsl;
147     if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
148         return REDISMODULE_OK;
149 
150     if (!fsl) {
151         RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
152                                       NULL, timeout, &argv[1], 1, NULL);
153     } else {
154         RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
155     }
156 
157     return REDISMODULE_OK;
158 }
159 
bpopgt_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)160 int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
161     REDISMODULE_NOT_USED(argv);
162     REDISMODULE_NOT_USED(argc);
163     RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
164     long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx);
165 
166     fsl_t *fsl;
167     if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
168         return REDISMODULE_ERR;
169 
170     if (fsl->list[fsl->length-1] <= *pgt)
171         return REDISMODULE_ERR;
172 
173     RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
174     return REDISMODULE_OK;
175 }
176 
bpopgt_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)177 int bpopgt_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
178     REDISMODULE_NOT_USED(argv);
179     REDISMODULE_NOT_USED(argc);
180     return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
181 }
182 
bpopgt_free_privdata(RedisModuleCtx * ctx,void * privdata)183 void bpopgt_free_privdata(RedisModuleCtx *ctx, void *privdata) {
184     REDISMODULE_NOT_USED(ctx);
185     RedisModule_Free(privdata);
186 }
187 
188 /* FSL.BPOPGT <key> <gt> <timeout> - Block clients until list has an element greater than <gt>.
189  * When that happens, unblock client and pop the last element (from the right). */
fsl_bpopgt(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)190 int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
191     if (argc != 4)
192         return RedisModule_WrongArity(ctx);
193 
194     long long gt;
195     if (RedisModule_StringToLongLong(argv[2],&gt) != REDISMODULE_OK)
196         return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
197 
198     long long timeout;
199     if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
200         return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
201 
202     fsl_t *fsl;
203     if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
204         return REDISMODULE_OK;
205 
206     if (!fsl || fsl->list[fsl->length-1] <= gt) {
207         /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
208         long long *pgt = RedisModule_Alloc(sizeof(long long));
209         *pgt = gt;
210         RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
211                                       bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
212     } else {
213         RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
214     }
215 
216     return REDISMODULE_OK;
217 }
218 
bpoppush_reply_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)219 int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
220     REDISMODULE_NOT_USED(argv);
221     REDISMODULE_NOT_USED(argc);
222     RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx);
223     RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx);
224 
225     fsl_t *src;
226     if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src)
227         return REDISMODULE_ERR;
228 
229     fsl_t *dst;
230     if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst)
231         return REDISMODULE_ERR;
232 
233     long long ele = src->list[--src->length];
234     dst->list[dst->length++] = ele;
235     RedisModule_SignalKeyAsReady(ctx, dst_keyname);
236     return RedisModule_ReplyWithLongLong(ctx, ele);
237 }
238 
bpoppush_timeout_callback(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)239 int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
240     REDISMODULE_NOT_USED(argv);
241     REDISMODULE_NOT_USED(argc);
242     return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
243 }
244 
bpoppush_free_privdata(RedisModuleCtx * ctx,void * privdata)245 void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) {
246     RedisModule_FreeString(ctx, privdata);
247 }
248 
249 /* FSL.BPOPPUSH <src> <dst> <timeout> - Block clients until <src> has an element.
250  * When that happens, unblock client, pop the last element from <src> and push it to <dst>
251  * (from the right). */
fsl_bpoppush(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)252 int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
253     if (argc != 4)
254         return RedisModule_WrongArity(ctx);
255 
256     long long timeout;
257     if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0)
258         return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
259 
260     fsl_t *src;
261     if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1))
262         return REDISMODULE_OK;
263 
264     if (!src) {
265         /* Retain string for reply callback */
266         RedisModule_RetainString(ctx, argv[2]);
267         /* Key is empty, we must block */
268         RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback,
269                                       bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]);
270     } else {
271         fsl_t *dst;
272         if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1))
273             return REDISMODULE_OK;
274         long long ele = src->list[--src->length];
275         dst->list[dst->length++] = ele;
276         RedisModule_SignalKeyAsReady(ctx, argv[2]);
277         RedisModule_ReplyWithLongLong(ctx, ele);
278     }
279 
280     return REDISMODULE_OK;
281 }
282 
283 /* FSL.GETALL <key> - Reply with an array containing all elements. */
fsl_getall(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)284 int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
285     if (argc != 2)
286         return RedisModule_WrongArity(ctx);
287 
288     fsl_t *fsl;
289     if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
290         return REDISMODULE_OK;
291 
292     if (!fsl)
293         return RedisModule_ReplyWithArray(ctx, 0);
294 
295     RedisModule_ReplyWithArray(ctx, fsl->length);
296     for (int i = 0; i < fsl->length; i++)
297         RedisModule_ReplyWithLongLong(ctx, fsl->list[i]);
298     return REDISMODULE_OK;
299 }
300 
RedisModule_OnLoad(RedisModuleCtx * ctx,RedisModuleString ** argv,int argc)301 int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
302     REDISMODULE_NOT_USED(argv);
303     REDISMODULE_NOT_USED(argc);
304 
305     if (RedisModule_Init(ctx, "blockonkeys", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
306         return REDISMODULE_ERR;
307 
308     RedisModuleTypeMethods tm = {
309         .version = REDISMODULE_TYPE_METHOD_VERSION,
310         .rdb_load = fsl_rdb_load,
311         .rdb_save = fsl_rdb_save,
312         .aof_rewrite = fsl_aofrw,
313         .mem_usage = NULL,
314         .free = fsl_free,
315         .digest = NULL
316     };
317 
318     fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
319     if (fsltype == NULL)
320         return REDISMODULE_ERR;
321 
322     if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR)
323         return REDISMODULE_ERR;
324 
325     if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR)
326         return REDISMODULE_ERR;
327 
328     if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR)
329         return REDISMODULE_ERR;
330 
331     if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR)
332         return REDISMODULE_ERR;
333 
334     if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR)
335         return REDISMODULE_ERR;
336 
337     return REDISMODULE_OK;
338 }
339