1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Chunk I/O
4 * =========
5 * Copyright 2018 Eduardo Silva <eduardo@monkey.io>
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <chunkio/chunkio_compat.h>
24
25 #include <chunkio/chunkio.h>
26 #include <chunkio/cio_os.h>
27 #include <chunkio/cio_log.h>
28 #include <chunkio/cio_chunk.h>
29 #include <chunkio/cio_stream.h>
30 #include <chunkio/cio_utils.h>
31
32 #include <monkey/mk_core/mk_list.h>
33
get_stream_path(struct cio_ctx * ctx,struct cio_stream * st)34 static char *get_stream_path(struct cio_ctx *ctx, struct cio_stream *st)
35 {
36 int ret;
37 int len;
38 char *p;
39
40 /* Compose final path */
41 len = strlen(ctx->root_path) + strlen(st->name) + 2;
42 p = malloc(len + 1);
43 if (!p) {
44 cio_errno();
45 return NULL;
46 }
47
48 ret = snprintf(p, len, "%s/%s", ctx->root_path, st->name);
49 if (ret == -1) {
50 cio_errno();
51 free(p);
52 return NULL;
53 }
54
55 return p;
56 }
57
check_stream_path(struct cio_ctx * ctx,const char * path)58 static int check_stream_path(struct cio_ctx *ctx, const char *path)
59 {
60 int ret;
61 int len;
62 char *p;
63
64 /* Compose final path */
65 len = strlen(ctx->root_path) + strlen(path) + 2;
66 p = malloc(len + 1);
67 if (!p) {
68 cio_errno();
69 return -1;
70 }
71 ret = snprintf(p, len, "%s/%s", ctx->root_path, path);
72 if (ret == -1) {
73 cio_errno();
74 free(p);
75 return -1;
76 }
77
78 ret = cio_os_isdir(p);
79 if (ret == -1) {
80 /* Try to create the path */
81 ret = cio_os_mkpath(p, 0755);
82 if (ret == -1) {
83 cio_log_error(ctx, "cannot create stream path %s", p);
84 free(p);
85 return -1;
86 }
87 cio_log_debug(ctx, "created stream path %s", p);
88 free(p);
89 return 0;
90 }
91
92 /* Check write access and release*/
93 ret = access(p, W_OK);
94 free(p);
95 return ret;
96 }
97
cio_stream_get(struct cio_ctx * ctx,const char * name)98 struct cio_stream *cio_stream_get(struct cio_ctx *ctx, const char *name)
99 {
100 struct mk_list *head;
101 struct cio_stream *st;
102
103 mk_list_foreach(head, &ctx->streams) {
104 st = mk_list_entry(head, struct cio_stream, _head);
105 if (strcmp(st->name, name) == 0) {
106 return st;
107 }
108 }
109
110 return NULL;
111 }
112
cio_stream_create(struct cio_ctx * ctx,const char * name,int type)113 struct cio_stream *cio_stream_create(struct cio_ctx *ctx, const char *name,
114 int type)
115 {
116 int ret;
117 int len;
118 struct cio_stream *st;
119
120 if (!name) {
121 cio_log_error(ctx, "[stream create] stream name not set");
122 return NULL;
123 }
124
125 len = strlen(name);
126 if (len == 0) {
127 cio_log_error(ctx, "[stream create] invalid stream name");
128 return NULL;
129 }
130
131 if (len == 1 && (name[0] == '.' || name[0] == '/')) {
132 cio_log_error(ctx, "[stream create] invalid stream name");
133 return NULL;
134 }
135 #ifndef CIO_HAVE_BACKEND_FILESYSTEM
136 if (type == CIO_STORE_FS) {
137 cio_log_error(ctx, "[stream create] file system backend not supported");
138 return NULL;
139 }
140 #endif
141
142 /* Find duplicated */
143 st = cio_stream_get(ctx, name);
144 if (st) {
145 cio_log_error(ctx, "[cio stream] stream already registered: %s", name);
146 return NULL;
147 }
148
149 /* If backend is the file system, validate the stream path */
150 if (type == CIO_STORE_FS) {
151 ret = check_stream_path(ctx, name);
152 if (ret == -1) {
153 return NULL;
154 }
155 }
156
157 st = malloc(sizeof(struct cio_stream));
158 if (!st) {
159 cio_errno();
160 return NULL;
161 }
162 st->type = type;
163 st->name = strdup(name);
164 if (!st->name) {
165 cio_errno();
166 free(st);
167 return NULL;
168 }
169
170 st->parent = ctx;
171 mk_list_init(&st->chunks);
172 mk_list_init(&st->chunks_up);
173 mk_list_init(&st->chunks_down);
174 mk_list_add(&st->_head, &ctx->streams);
175
176 cio_log_debug(ctx, "[cio stream] new stream registered: %s", name);
177 return st;
178 }
179
cio_stream_destroy(struct cio_stream * st)180 void cio_stream_destroy(struct cio_stream *st)
181 {
182 if (!st) {
183 return;
184 }
185 /* close all files */
186 cio_chunk_close_stream(st);
187
188 /* destroy stream */
189 mk_list_del(&st->_head);
190 free(st->name);
191 free(st);
192 }
193
194 /* Deletes a complete stream, this include all chunks available */
cio_stream_delete(struct cio_stream * st)195 int cio_stream_delete(struct cio_stream *st)
196 {
197 int ret;
198 char *path;
199 struct mk_list *tmp;
200 struct mk_list *head;
201 struct cio_chunk *ch;
202 struct cio_ctx *ctx;
203
204 ctx = st->parent;
205
206 /* delete all chunks */
207 mk_list_foreach_safe(head, tmp, &st->chunks) {
208 ch = mk_list_entry(head, struct cio_chunk, _head);
209 cio_chunk_close(ch, CIO_TRUE);
210 }
211
212 #ifdef CIO_HAVE_BACKEND_FILESYSTEM
213 /* If the stream is filesystem based, destroy the real directory */
214 if (st->type == CIO_STORE_FS) {
215 path = get_stream_path(ctx, st);
216 if (!path) {
217 cio_log_error(ctx,
218 "content from stream '%s' has been deleted, but the "
219 "directory might still exists.", path);
220 return -1;
221 }
222
223 cio_log_debug(ctx, "[cio stream] delete stream path: %s", path);
224
225 /* Recursive deletion */
226 ret = cio_utils_recursive_delete(path);
227 if (ret == -1) {
228 cio_log_error(ctx, "error in recursive deletion of path %s", path);
229 free(path);
230 return -1;
231 }
232 free(path);
233
234 return ret;
235 }
236 #endif
237
238 return 0;
239 }
240
cio_stream_destroy_all(struct cio_ctx * ctx)241 void cio_stream_destroy_all(struct cio_ctx *ctx)
242 {
243 struct mk_list *tmp;
244 struct mk_list *head;
245 struct cio_stream *st;
246
247 if (!ctx) {
248 return;
249 }
250
251 mk_list_foreach_safe(head, tmp, &ctx->streams) {
252 st = mk_list_entry(head, struct cio_stream, _head);
253 cio_stream_destroy(st);
254 }
255 }
256
257 /* Return the total number of bytes being used by Chunks up in memory */
cio_stream_size_chunks_up(struct cio_stream * st)258 size_t cio_stream_size_chunks_up(struct cio_stream *st)
259 {
260 ssize_t bytes;
261 size_t total = 0;
262 struct cio_chunk *ch;
263 struct mk_list *head;
264
265 mk_list_foreach(head, &st->chunks_up) {
266 ch = mk_list_entry(head, struct cio_chunk, _state_head);
267
268 bytes = cio_chunk_get_content_size(ch);
269 if (bytes <= 0) {
270 continue;
271 }
272 total += bytes;
273 }
274
275 return total;
276 }
277