1 #include "common.h"
2 #include "log.h"
3 #include "obj-backend.h"
4 
5 #ifdef RIAK_BACKEND
6 
7 #include "riak-client.h"
8 
9 #include <pthread.h>
10 
11 typedef struct RiakPriv {
12     const char *host;
13     const char *port;
14     const char *bucket;
15     int n_write;
16 
17     GQueue *conn_pool;
18     pthread_mutex_t lock;
19 } RiakPriv;
20 
21 static SeafRiakClient *
get_connection(RiakPriv * priv)22 get_connection (RiakPriv *priv)
23 {
24     SeafRiakClient *connection;
25 
26     pthread_mutex_lock (&priv->lock);
27 
28     connection = g_queue_pop_head (priv->conn_pool);
29     if (!connection)
30         connection = seaf_riak_client_new (priv->host, priv->port);
31     pthread_mutex_unlock (&priv->lock);
32     return connection;
33 }
34 
35 static void
return_connection(RiakPriv * priv,SeafRiakClient * connection)36 return_connection (RiakPriv *priv, SeafRiakClient *connection)
37 {
38     pthread_mutex_lock (&priv->lock);
39     g_queue_push_tail (priv->conn_pool, connection);
40     pthread_mutex_unlock (&priv->lock);
41 }
42 
43 static int
obj_backend_riak_read(ObjBackend * bend,const char * obj_id,void ** data,int * len)44 obj_backend_riak_read (ObjBackend *bend,
45                        const char *obj_id,
46                        void **data,
47                        int *len)
48 {
49     SeafRiakClient *conn = get_connection (bend->priv);
50     RiakPriv *priv = bend->priv;
51     int ret;
52 
53     ret = seaf_riak_client_get (conn, priv->bucket, obj_id, data, len);
54 
55     return_connection (priv, conn);
56     return ret;
57 }
58 
59 static int
obj_backend_riak_write(ObjBackend * bend,const char * obj_id,void * data,int len)60 obj_backend_riak_write (ObjBackend *bend,
61                         const char *obj_id,
62                         void *data,
63                         int len)
64 {
65     SeafRiakClient *conn = get_connection (bend->priv);
66     RiakPriv *priv = bend->priv;
67     int ret;
68 
69     ret = seaf_riak_client_put (conn, priv->bucket, obj_id, data, len,
70                                 priv->n_write);
71 
72     return_connection (priv, conn);
73     return ret;
74 }
75 
76 static gboolean
obj_backend_riak_exists(ObjBackend * bend,const char * obj_id)77 obj_backend_riak_exists (ObjBackend *bend,
78                          const char *obj_id)
79 {
80     SeafRiakClient *conn = get_connection (bend->priv);
81     RiakPriv *priv = bend->priv;
82     gboolean ret;
83 
84     ret = seaf_riak_client_query (conn, priv->bucket, obj_id);
85 
86     return_connection (priv, conn);
87     return ret;
88 }
89 
90 static void
obj_backend_riak_delete(ObjBackend * bend,const char * obj_id)91 obj_backend_riak_delete (ObjBackend *bend,
92                          const char *obj_id)
93 {
94     SeafRiakClient *conn = get_connection (bend->priv);
95     RiakPriv *priv = bend->priv;
96 
97     seaf_riak_client_delete (conn, priv->bucket, obj_id, priv->n_write);
98 
99     return_connection (priv, conn);
100 }
101 
102 ObjBackend *
obj_backend_riak_new(const char * host,const char * port,const char * bucket,const char * write_policy)103 obj_backend_riak_new (const char *host,
104                       const char *port,
105                       const char *bucket,
106                       const char *write_policy)
107 {
108     ObjBackend *bend;
109     RiakPriv *priv;
110 
111     bend = g_new0(ObjBackend, 1);
112     priv = g_new0(RiakPriv, 1);
113     bend->priv = priv;
114 
115     priv->host = g_strdup (host);
116     priv->port = g_strdup (port);
117     priv->bucket = g_strdup (bucket);
118     if (strcmp (write_policy, "quorum") == 0)
119         priv->n_write = RIAK_QUORUM;
120     else if (strcmp (write_policy, "all") == 0)
121         priv->n_write = RIAK_ALL;
122     else
123         g_return_val_if_reached (NULL);
124 
125     priv->conn_pool = g_queue_new ();
126     pthread_mutex_init (&priv->lock, NULL);
127 
128     bend->read = obj_backend_riak_read;
129     bend->write = obj_backend_riak_write;
130     bend->exists = obj_backend_riak_exists;
131     bend->delete = obj_backend_riak_delete;
132 
133     return bend;
134 }
135 
136 #else
137 
138 ObjBackend *
obj_backend_riak_new(const char * host,const char * port,const char * bucket,const char * write_policy)139 obj_backend_riak_new (const char *host,
140                       const char *port,
141                       const char *bucket,
142                       const char *write_policy)
143 {
144     seaf_warning ("Riak backend is not enabled.\n");
145     return NULL;
146 }
147 
148 #endif  /* RIAK_BACKEND */
149