1 /*
2  * Borg cache synchronizer,
3  * high level interface.
4  *
5  * These routines parse msgpacked item metadata and update a HashIndex
6  * with all chunks that are referenced from the items.
7  *
8  * This file only contains some initialization and buffer management.
9  *
10  * The parser is split in two parts, somewhat similar to lexer/parser combinations:
11  *
12  * unpack_template.h munches msgpack and calls a specific callback for each object
13  * encountered (e.g. beginning of a map, an integer, a string, a map item etc.).
14  *
15  * unpack.h implements these callbacks and uses another state machine to
16  * extract chunk references from it.
17  */
18 
19 #include "unpack.h"
20 
21 typedef struct {
22     unpack_context ctx;
23 
24     char *buf;
25     size_t head;
26     size_t tail;
27     size_t size;
28 } CacheSyncCtx;
29 
30 static CacheSyncCtx *
cache_sync_init(HashIndex * chunks)31 cache_sync_init(HashIndex *chunks)
32 {
33     CacheSyncCtx *ctx;
34     if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) {
35         return NULL;
36     }
37 
38     unpack_init(&ctx->ctx);
39     /* needs to be set only once */
40     ctx->ctx.user.chunks = chunks;
41     ctx->ctx.user.parts.size = 0;
42     ctx->ctx.user.parts.csize = 0;
43     ctx->ctx.user.parts.num_files = 0;
44     ctx->ctx.user.totals.size = 0;
45     ctx->ctx.user.totals.csize = 0;
46     ctx->ctx.user.totals.num_files = 0;
47     ctx->buf = NULL;
48     ctx->head = 0;
49     ctx->tail = 0;
50     ctx->size = 0;
51 
52     return ctx;
53 }
54 
55 static void
cache_sync_free(CacheSyncCtx * ctx)56 cache_sync_free(CacheSyncCtx *ctx)
57 {
58     if(ctx->buf) {
59         free(ctx->buf);
60     }
61     free(ctx);
62 }
63 
64 static const char *
cache_sync_error(const CacheSyncCtx * ctx)65 cache_sync_error(const CacheSyncCtx *ctx)
66 {
67     return ctx->ctx.user.last_error;
68 }
69 
70 static uint64_t
cache_sync_num_files_totals(const CacheSyncCtx * ctx)71 cache_sync_num_files_totals(const CacheSyncCtx *ctx)
72 {
73     return ctx->ctx.user.totals.num_files;
74 }
75 
76 static uint64_t
cache_sync_num_files_parts(const CacheSyncCtx * ctx)77 cache_sync_num_files_parts(const CacheSyncCtx *ctx)
78 {
79     return ctx->ctx.user.parts.num_files;
80 }
81 
82 static uint64_t
cache_sync_size_totals(const CacheSyncCtx * ctx)83 cache_sync_size_totals(const CacheSyncCtx *ctx)
84 {
85     return ctx->ctx.user.totals.size;
86 }
87 
88 static uint64_t
cache_sync_size_parts(const CacheSyncCtx * ctx)89 cache_sync_size_parts(const CacheSyncCtx *ctx)
90 {
91     return ctx->ctx.user.parts.size;
92 }
93 
94 static uint64_t
cache_sync_csize_totals(const CacheSyncCtx * ctx)95 cache_sync_csize_totals(const CacheSyncCtx *ctx)
96 {
97     return ctx->ctx.user.totals.csize;
98 }
99 
100 static uint64_t
cache_sync_csize_parts(const CacheSyncCtx * ctx)101 cache_sync_csize_parts(const CacheSyncCtx *ctx)
102 {
103     return ctx->ctx.user.parts.csize;
104 }
105 
106 /**
107  * feed data to the cache synchronizer
108  * 0 = abort, 1 = continue
109  * abort is a regular condition, check cache_sync_error
110  */
111 static int
cache_sync_feed(CacheSyncCtx * ctx,void * data,uint32_t length)112 cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
113 {
114     size_t new_size;
115     int ret;
116     char *new_buf;
117 
118     if(ctx->tail + length > ctx->size) {
119         if((ctx->tail - ctx->head) + length <= ctx->size) {
120             /* |  XXXXX| -> move data in buffer backwards -> |XXXXX  | */
121             memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
122             ctx->tail -= ctx->head;
123             ctx->head = 0;
124         } else {
125             /* must expand buffer to fit all data */
126             new_size = (ctx->tail - ctx->head) + length;
127             new_buf = (char*) malloc(new_size);
128             if(!new_buf) {
129                 ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer";
130                 return 0;
131             }
132             if(ctx->buf) {
133                 memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
134                 free(ctx->buf);
135             }
136             ctx->buf = new_buf;
137             ctx->tail -= ctx->head;
138             ctx->head = 0;
139             ctx->size = new_size;
140         }
141     }
142 
143     memcpy(ctx->buf + ctx->tail, data, length);
144     ctx->tail += length;
145 
146     while(1) {
147         if(ctx->head >= ctx->tail) {
148             return 1;  /* request more bytes */
149         }
150 
151         ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head);
152         if(ret == 1) {
153             unpack_init(&ctx->ctx);
154             continue;
155         } else if(ret == 0) {
156             return 1;
157         } else {
158             if(!ctx->ctx.user.last_error) {
159                 ctx->ctx.user.last_error = "Unknown error";
160             }
161             return 0;
162         }
163     }
164     /* unreachable */
165     return 1;
166 }
167