1 #include "common.h"
2 #include "log.h"
3 #include "obj-backend.h"
4
5 #ifdef RIAK_BACKEND
6
7 #include "riak-client.h"
8
9 #include <pthread.h>
10
11 typedef struct RiakPriv {
12 const char *host;
13 const char *port;
14 const char *bucket;
15 int n_write;
16
17 GQueue *conn_pool;
18 pthread_mutex_t lock;
19 } RiakPriv;
20
21 static SeafRiakClient *
get_connection(RiakPriv * priv)22 get_connection (RiakPriv *priv)
23 {
24 SeafRiakClient *connection;
25
26 pthread_mutex_lock (&priv->lock);
27
28 connection = g_queue_pop_head (priv->conn_pool);
29 if (!connection)
30 connection = seaf_riak_client_new (priv->host, priv->port);
31 pthread_mutex_unlock (&priv->lock);
32 return connection;
33 }
34
35 static void
return_connection(RiakPriv * priv,SeafRiakClient * connection)36 return_connection (RiakPriv *priv, SeafRiakClient *connection)
37 {
38 pthread_mutex_lock (&priv->lock);
39 g_queue_push_tail (priv->conn_pool, connection);
40 pthread_mutex_unlock (&priv->lock);
41 }
42
43 static int
obj_backend_riak_read(ObjBackend * bend,const char * obj_id,void ** data,int * len)44 obj_backend_riak_read (ObjBackend *bend,
45 const char *obj_id,
46 void **data,
47 int *len)
48 {
49 SeafRiakClient *conn = get_connection (bend->priv);
50 RiakPriv *priv = bend->priv;
51 int ret;
52
53 ret = seaf_riak_client_get (conn, priv->bucket, obj_id, data, len);
54
55 return_connection (priv, conn);
56 return ret;
57 }
58
59 static int
obj_backend_riak_write(ObjBackend * bend,const char * obj_id,void * data,int len)60 obj_backend_riak_write (ObjBackend *bend,
61 const char *obj_id,
62 void *data,
63 int len)
64 {
65 SeafRiakClient *conn = get_connection (bend->priv);
66 RiakPriv *priv = bend->priv;
67 int ret;
68
69 ret = seaf_riak_client_put (conn, priv->bucket, obj_id, data, len,
70 priv->n_write);
71
72 return_connection (priv, conn);
73 return ret;
74 }
75
76 static gboolean
obj_backend_riak_exists(ObjBackend * bend,const char * obj_id)77 obj_backend_riak_exists (ObjBackend *bend,
78 const char *obj_id)
79 {
80 SeafRiakClient *conn = get_connection (bend->priv);
81 RiakPriv *priv = bend->priv;
82 gboolean ret;
83
84 ret = seaf_riak_client_query (conn, priv->bucket, obj_id);
85
86 return_connection (priv, conn);
87 return ret;
88 }
89
90 static void
obj_backend_riak_delete(ObjBackend * bend,const char * obj_id)91 obj_backend_riak_delete (ObjBackend *bend,
92 const char *obj_id)
93 {
94 SeafRiakClient *conn = get_connection (bend->priv);
95 RiakPriv *priv = bend->priv;
96
97 seaf_riak_client_delete (conn, priv->bucket, obj_id, priv->n_write);
98
99 return_connection (priv, conn);
100 }
101
102 ObjBackend *
obj_backend_riak_new(const char * host,const char * port,const char * bucket,const char * write_policy)103 obj_backend_riak_new (const char *host,
104 const char *port,
105 const char *bucket,
106 const char *write_policy)
107 {
108 ObjBackend *bend;
109 RiakPriv *priv;
110
111 bend = g_new0(ObjBackend, 1);
112 priv = g_new0(RiakPriv, 1);
113 bend->priv = priv;
114
115 priv->host = g_strdup (host);
116 priv->port = g_strdup (port);
117 priv->bucket = g_strdup (bucket);
118 if (strcmp (write_policy, "quorum") == 0)
119 priv->n_write = RIAK_QUORUM;
120 else if (strcmp (write_policy, "all") == 0)
121 priv->n_write = RIAK_ALL;
122 else
123 g_return_val_if_reached (NULL);
124
125 priv->conn_pool = g_queue_new ();
126 pthread_mutex_init (&priv->lock, NULL);
127
128 bend->read = obj_backend_riak_read;
129 bend->write = obj_backend_riak_write;
130 bend->exists = obj_backend_riak_exists;
131 bend->delete = obj_backend_riak_delete;
132
133 return bend;
134 }
135
136 #else
137
138 ObjBackend *
obj_backend_riak_new(const char * host,const char * port,const char * bucket,const char * write_policy)139 obj_backend_riak_new (const char *host,
140 const char *port,
141 const char *bucket,
142 const char *write_policy)
143 {
144 seaf_warning ("Riak backend is not enabled.\n");
145 return NULL;
146 }
147
148 #endif /* RIAK_BACKEND */
149