1 /*
2 * Borg cache synchronizer,
3 * high level interface.
4 *
5 * These routines parse msgpacked item metadata and update a HashIndex
6 * with all chunks that are referenced from the items.
7 *
8 * This file only contains some initialization and buffer management.
9 *
10 * The parser is split in two parts, somewhat similar to lexer/parser combinations:
11 *
12 * unpack_template.h munches msgpack and calls a specific callback for each object
13 * encountered (e.g. beginning of a map, an integer, a string, a map item etc.).
14 *
15 * unpack.h implements these callbacks and uses another state machine to
16 * extract chunk references from it.
17 */
18
19 #include "unpack.h"
20
21 typedef struct {
22 unpack_context ctx;
23
24 char *buf;
25 size_t head;
26 size_t tail;
27 size_t size;
28 } CacheSyncCtx;
29
30 static CacheSyncCtx *
cache_sync_init(HashIndex * chunks)31 cache_sync_init(HashIndex *chunks)
32 {
33 CacheSyncCtx *ctx;
34 if (!(ctx = (CacheSyncCtx*)malloc(sizeof(CacheSyncCtx)))) {
35 return NULL;
36 }
37
38 unpack_init(&ctx->ctx);
39 /* needs to be set only once */
40 ctx->ctx.user.chunks = chunks;
41 ctx->ctx.user.parts.size = 0;
42 ctx->ctx.user.parts.csize = 0;
43 ctx->ctx.user.parts.num_files = 0;
44 ctx->ctx.user.totals.size = 0;
45 ctx->ctx.user.totals.csize = 0;
46 ctx->ctx.user.totals.num_files = 0;
47 ctx->buf = NULL;
48 ctx->head = 0;
49 ctx->tail = 0;
50 ctx->size = 0;
51
52 return ctx;
53 }
54
55 static void
cache_sync_free(CacheSyncCtx * ctx)56 cache_sync_free(CacheSyncCtx *ctx)
57 {
58 if(ctx->buf) {
59 free(ctx->buf);
60 }
61 free(ctx);
62 }
63
64 static const char *
cache_sync_error(const CacheSyncCtx * ctx)65 cache_sync_error(const CacheSyncCtx *ctx)
66 {
67 return ctx->ctx.user.last_error;
68 }
69
70 static uint64_t
cache_sync_num_files_totals(const CacheSyncCtx * ctx)71 cache_sync_num_files_totals(const CacheSyncCtx *ctx)
72 {
73 return ctx->ctx.user.totals.num_files;
74 }
75
76 static uint64_t
cache_sync_num_files_parts(const CacheSyncCtx * ctx)77 cache_sync_num_files_parts(const CacheSyncCtx *ctx)
78 {
79 return ctx->ctx.user.parts.num_files;
80 }
81
82 static uint64_t
cache_sync_size_totals(const CacheSyncCtx * ctx)83 cache_sync_size_totals(const CacheSyncCtx *ctx)
84 {
85 return ctx->ctx.user.totals.size;
86 }
87
88 static uint64_t
cache_sync_size_parts(const CacheSyncCtx * ctx)89 cache_sync_size_parts(const CacheSyncCtx *ctx)
90 {
91 return ctx->ctx.user.parts.size;
92 }
93
94 static uint64_t
cache_sync_csize_totals(const CacheSyncCtx * ctx)95 cache_sync_csize_totals(const CacheSyncCtx *ctx)
96 {
97 return ctx->ctx.user.totals.csize;
98 }
99
100 static uint64_t
cache_sync_csize_parts(const CacheSyncCtx * ctx)101 cache_sync_csize_parts(const CacheSyncCtx *ctx)
102 {
103 return ctx->ctx.user.parts.csize;
104 }
105
106 /**
107 * feed data to the cache synchronizer
108 * 0 = abort, 1 = continue
109 * abort is a regular condition, check cache_sync_error
110 */
111 static int
cache_sync_feed(CacheSyncCtx * ctx,void * data,uint32_t length)112 cache_sync_feed(CacheSyncCtx *ctx, void *data, uint32_t length)
113 {
114 size_t new_size;
115 int ret;
116 char *new_buf;
117
118 if(ctx->tail + length > ctx->size) {
119 if((ctx->tail - ctx->head) + length <= ctx->size) {
120 /* | XXXXX| -> move data in buffer backwards -> |XXXXX | */
121 memmove(ctx->buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
122 ctx->tail -= ctx->head;
123 ctx->head = 0;
124 } else {
125 /* must expand buffer to fit all data */
126 new_size = (ctx->tail - ctx->head) + length;
127 new_buf = (char*) malloc(new_size);
128 if(!new_buf) {
129 ctx->ctx.user.last_error = "cache_sync_feed: unable to allocate buffer";
130 return 0;
131 }
132 if(ctx->buf) {
133 memcpy(new_buf, ctx->buf + ctx->head, ctx->tail - ctx->head);
134 free(ctx->buf);
135 }
136 ctx->buf = new_buf;
137 ctx->tail -= ctx->head;
138 ctx->head = 0;
139 ctx->size = new_size;
140 }
141 }
142
143 memcpy(ctx->buf + ctx->tail, data, length);
144 ctx->tail += length;
145
146 while(1) {
147 if(ctx->head >= ctx->tail) {
148 return 1; /* request more bytes */
149 }
150
151 ret = unpack_execute(&ctx->ctx, ctx->buf, ctx->tail, &ctx->head);
152 if(ret == 1) {
153 unpack_init(&ctx->ctx);
154 continue;
155 } else if(ret == 0) {
156 return 1;
157 } else {
158 if(!ctx->ctx.user.last_error) {
159 ctx->ctx.user.last_error = "Unknown error";
160 }
161 return 0;
162 }
163 }
164 /* unreachable */
165 return 1;
166 }
167