1 #include "common.h"
2 #include "log.h"
3 #include "utils.h"
4 #include "mq-mgr.h"
5
6 typedef struct SeafMqManagerPriv {
7 // chan <-> async_queue
8 GHashTable *chans;
9 } SeafMqManagerPriv;
10
11 SeafMqManager *
seaf_mq_manager_new()12 seaf_mq_manager_new ()
13 {
14 SeafMqManager *mgr = g_new0 (SeafMqManager, 1);
15 mgr->priv = g_new0 (SeafMqManagerPriv, 1);
16 mgr->priv->chans = g_hash_table_new_full (g_str_hash, g_str_equal,
17 (GDestroyNotify)g_free,
18 (GDestroyNotify)g_async_queue_unref);
19
20 return mgr;
21 }
22
23 static GAsyncQueue *
seaf_mq_manager_channel_new(SeafMqManager * mgr,const char * channel)24 seaf_mq_manager_channel_new (SeafMqManager *mgr, const char *channel)
25 {
26 GAsyncQueue *async_queue = NULL;
27 async_queue = g_async_queue_new_full ((GDestroyNotify)json_decref);
28
29 g_hash_table_replace (mgr->priv->chans, g_strdup (channel), async_queue);
30
31 return async_queue;
32 }
33
34 int
seaf_mq_manager_publish_event(SeafMqManager * mgr,const char * channel,const char * content)35 seaf_mq_manager_publish_event (SeafMqManager *mgr, const char *channel, const char *content)
36 {
37 int ret = 0;
38
39 if (!channel || !content) {
40 seaf_warning ("type and content should not be NULL.\n");
41 return -1;
42 }
43
44 GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
45 if (!async_queue) {
46 async_queue = seaf_mq_manager_channel_new(mgr, channel);
47 }
48
49 if (!async_queue) {
50 seaf_warning("%s channel creation failed.\n", channel);
51 return -1;
52 }
53
54 json_t *msg = json_object();
55 json_object_set_new (msg, "content", json_string(content));
56 json_object_set_new (msg, "ctime", json_integer(time(NULL)));
57 g_async_queue_push (async_queue, msg);
58
59 return ret;
60 }
61
62 json_t *
seaf_mq_manager_pop_event(SeafMqManager * mgr,const char * channel)63 seaf_mq_manager_pop_event (SeafMqManager *mgr, const char *channel)
64 {
65 GAsyncQueue *async_queue = g_hash_table_lookup (mgr->priv->chans, channel);
66 if (!async_queue)
67 return NULL;
68
69 return g_async_queue_try_pop (async_queue);
70 }
71