1 #include "common.h"
2 #include "log.h"
3 #include "utils.h"
4 #include "mq-mgr.h"
5 
6 typedef struct SeafMqManagerPriv {
7     // chan <-> async_queue
8     GHashTable *chans;
9 } SeafMqManagerPriv;
10 
11 SeafMqManager *
seaf_mq_manager_new()12 seaf_mq_manager_new ()
13 {
14     SeafMqManager *mgr = g_new0 (SeafMqManager, 1);
15     mgr->priv = g_new0 (SeafMqManagerPriv, 1);
16     mgr->priv->chans = g_hash_table_new_full (g_str_hash, g_str_equal,
17                                               (GDestroyNotify)g_free,
18                                               (GDestroyNotify)g_async_queue_unref);
19 
20     return mgr;
21 }
22 
23 static GAsyncQueue *
seaf_mq_manager_channel_new(SeafMqManager * mgr,const char * channel)24 seaf_mq_manager_channel_new (SeafMqManager *mgr, const char *channel)
25 {
26     GAsyncQueue *async_queue = NULL;
27     async_queue = g_async_queue_new_full ((GDestroyNotify)json_decref);
28 
29     g_hash_table_replace (mgr->priv->chans, g_strdup (channel), async_queue);
30 
31     return async_queue;
32 }
33 
34 int
seaf_mq_manager_publish_event(SeafMqManager * mgr,const char * channel,const char * content)35 seaf_mq_manager_publish_event (SeafMqManager *mgr, const char *channel, const char *content)
36 {
37     int ret = 0;
38 
39     if (!channel || !content) {
40         seaf_warning ("type and content should not be NULL.\n");
41         return -1;
42     }
43 
44     GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
45     if (!async_queue) {
46         async_queue = seaf_mq_manager_channel_new(mgr, channel);
47     }
48 
49     if (!async_queue) {
50         seaf_warning("%s channel creation failed.\n", channel);
51         return -1;
52     }
53 
54     json_t *msg = json_object();
55     json_object_set_new (msg, "content", json_string(content));
56     json_object_set_new (msg, "ctime", json_integer(time(NULL)));
57     g_async_queue_push (async_queue, msg);
58 
59     return ret;
60 }
61 
62 json_t *
seaf_mq_manager_pop_event(SeafMqManager * mgr,const char * channel)63 seaf_mq_manager_pop_event (SeafMqManager *mgr, const char *channel)
64 {
65     GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
66     if (!async_queue)
67         return NULL;
68 
69     return g_async_queue_try_pop (async_queue);
70 }
71