1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Chunk I/O
4  *  =========
5  *  Copyright 2018 Eduardo Silva <eduardo@monkey.io>
6  *
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  */
19 
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <chunkio/chunkio_compat.h>
24 
25 #include <chunkio/chunkio.h>
26 #include <chunkio/cio_os.h>
27 #include <chunkio/cio_log.h>
28 #include <chunkio/cio_chunk.h>
29 #include <chunkio/cio_stream.h>
30 #include <chunkio/cio_utils.h>
31 
32 #include <monkey/mk_core/mk_list.h>
33 
get_stream_path(struct cio_ctx * ctx,struct cio_stream * st)34 static char *get_stream_path(struct cio_ctx *ctx, struct cio_stream *st)
35 {
36     int ret;
37     int len;
38     char *p;
39 
40     /* Compose final path */
41     len = strlen(ctx->root_path) + strlen(st->name) + 2;
42     p = malloc(len + 1);
43     if (!p) {
44         cio_errno();
45         return NULL;
46     }
47 
48     ret = snprintf(p, len, "%s/%s", ctx->root_path, st->name);
49     if (ret == -1) {
50         cio_errno();
51         free(p);
52         return NULL;
53     }
54 
55     return p;
56 }
57 
check_stream_path(struct cio_ctx * ctx,const char * path)58 static int check_stream_path(struct cio_ctx *ctx, const char *path)
59 {
60     int ret;
61     int len;
62     char *p;
63 
64     /* Compose final path */
65     len = strlen(ctx->root_path) + strlen(path) + 2;
66     p = malloc(len + 1);
67     if (!p) {
68         cio_errno();
69         return -1;
70     }
71     ret = snprintf(p, len, "%s/%s", ctx->root_path, path);
72     if (ret == -1) {
73         cio_errno();
74         free(p);
75         return -1;
76     }
77 
78     ret = cio_os_isdir(p);
79     if (ret == -1) {
80         /* Try to create the path */
81         ret = cio_os_mkpath(p, 0755);
82         if (ret == -1) {
83             cio_log_error(ctx, "cannot create stream path %s", p);
84             free(p);
85             return -1;
86         }
87         cio_log_debug(ctx, "created stream path %s", p);
88         free(p);
89         return 0;
90     }
91 
92     /* Check write access and release*/
93     ret = access(p, W_OK);
94     free(p);
95     return ret;
96 }
97 
cio_stream_get(struct cio_ctx * ctx,const char * name)98 struct cio_stream *cio_stream_get(struct cio_ctx *ctx, const char *name)
99 {
100     struct mk_list *head;
101     struct cio_stream *st;
102 
103     mk_list_foreach(head, &ctx->streams) {
104         st = mk_list_entry(head, struct cio_stream, _head);
105         if (strcmp(st->name, name) == 0) {
106             return st;
107         }
108     }
109 
110     return NULL;
111 }
112 
cio_stream_create(struct cio_ctx * ctx,const char * name,int type)113 struct cio_stream *cio_stream_create(struct cio_ctx *ctx, const char *name,
114                                      int type)
115 {
116     int ret;
117     int len;
118     struct cio_stream *st;
119 
120     if (!name) {
121         cio_log_error(ctx, "[stream create] stream name not set");
122         return NULL;
123     }
124 
125     len = strlen(name);
126     if (len == 0) {
127         cio_log_error(ctx, "[stream create] invalid stream name");
128         return NULL;
129     }
130 
131     if (len == 1 && (name[0] == '.' || name[0] == '/')) {
132         cio_log_error(ctx, "[stream create] invalid stream name");
133         return NULL;
134     }
135 #ifndef CIO_HAVE_BACKEND_FILESYSTEM
136     if (type == CIO_STORE_FS) {
137         cio_log_error(ctx, "[stream create] file system backend not supported");
138         return NULL;
139     }
140 #endif
141 
142     /* Find duplicated */
143     st = cio_stream_get(ctx, name);
144     if (st) {
145         cio_log_error(ctx, "[cio stream] stream already registered: %s", name);
146         return NULL;
147     }
148 
149     /* If backend is the file system, validate the stream path */
150     if (type == CIO_STORE_FS) {
151         ret = check_stream_path(ctx, name);
152         if (ret == -1) {
153             return NULL;
154         }
155     }
156 
157     st = malloc(sizeof(struct cio_stream));
158     if (!st) {
159         cio_errno();
160         return NULL;
161     }
162     st->type = type;
163     st->name = strdup(name);
164     if (!st->name) {
165         cio_errno();
166         free(st);
167         return NULL;
168     }
169 
170     st->parent = ctx;
171     mk_list_init(&st->chunks);
172     mk_list_init(&st->chunks_up);
173     mk_list_init(&st->chunks_down);
174     mk_list_add(&st->_head, &ctx->streams);
175 
176     cio_log_debug(ctx, "[cio stream] new stream registered: %s", name);
177     return st;
178 }
179 
cio_stream_destroy(struct cio_stream * st)180 void cio_stream_destroy(struct cio_stream *st)
181 {
182     if (!st) {
183         return;
184     }
185     /* close all files */
186     cio_chunk_close_stream(st);
187 
188     /* destroy stream */
189     mk_list_del(&st->_head);
190     free(st->name);
191     free(st);
192 }
193 
194 /* Deletes a complete stream, this include all chunks available */
cio_stream_delete(struct cio_stream * st)195 int cio_stream_delete(struct cio_stream *st)
196 {
197     int ret;
198     char *path;
199     struct mk_list *tmp;
200     struct mk_list *head;
201     struct cio_chunk *ch;
202     struct cio_ctx *ctx;
203 
204     ctx = st->parent;
205 
206     /* delete all chunks */
207     mk_list_foreach_safe(head, tmp, &st->chunks) {
208         ch = mk_list_entry(head, struct cio_chunk, _head);
209         cio_chunk_close(ch, CIO_TRUE);
210     }
211 
212 #ifdef CIO_HAVE_BACKEND_FILESYSTEM
213     /* If the stream is filesystem based, destroy the real directory */
214     if (st->type == CIO_STORE_FS) {
215         path = get_stream_path(ctx, st);
216         if (!path) {
217             cio_log_error(ctx,
218                           "content from stream '%s' has been deleted, but the "
219                           "directory might still exists.", path);
220             return -1;
221         }
222 
223         cio_log_debug(ctx, "[cio stream] delete stream path: %s", path);
224 
225         /* Recursive deletion */
226         ret = cio_utils_recursive_delete(path);
227         if (ret == -1) {
228             cio_log_error(ctx, "error in recursive deletion of path %s", path);
229             free(path);
230             return -1;
231         }
232         free(path);
233 
234         return ret;
235     }
236 #endif
237 
238     return 0;
239 }
240 
cio_stream_destroy_all(struct cio_ctx * ctx)241 void cio_stream_destroy_all(struct cio_ctx *ctx)
242 {
243     struct mk_list *tmp;
244     struct mk_list *head;
245     struct cio_stream *st;
246 
247     if (!ctx) {
248         return;
249     }
250 
251     mk_list_foreach_safe(head, tmp, &ctx->streams) {
252         st = mk_list_entry(head, struct cio_stream, _head);
253         cio_stream_destroy(st);
254     }
255 }
256 
257 /* Return the total number of bytes being used by Chunks up in memory */
cio_stream_size_chunks_up(struct cio_stream * st)258 size_t cio_stream_size_chunks_up(struct cio_stream *st)
259 {
260     ssize_t bytes;
261     size_t total = 0;
262     struct cio_chunk *ch;
263     struct mk_list *head;
264 
265     mk_list_foreach(head, &st->chunks_up) {
266         ch = mk_list_entry(head, struct cio_chunk, _state_head);
267 
268         bytes = cio_chunk_get_content_size(ch);
269         if (bytes <= 0) {
270             continue;
271         }
272         total += bytes;
273     }
274 
275     return total;
276 }
277