1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
3
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
16
17 /* This casewindow implementation in terms of an class interface
18 is undoubtedly a form of over-abstraction. However, it works
19 and the extra abstraction seems to be harmless. */
20
21 #include <config.h>
22
23 #include "data/casewindow.h"
24
25 #include <stdlib.h>
26
27 #include "data/case-tmpfile.h"
28 #include "libpspp/assertion.h"
29 #include "libpspp/compiler.h"
30 #include "libpspp/deque.h"
31 #include "libpspp/taint.h"
32
33 #include "gl/xalloc.h"
34
35 /* A queue of cases. */
36 struct casewindow
37 {
38 /* Common data. */
39 struct caseproto *proto; /* Prototype of cases in window. */
40 casenumber max_in_core_cases; /* Max cases before dumping to disk. */
41 struct taint *taint; /* Taint status. */
42
43 /* Implementation data. */
44 const struct casewindow_class *class;
45 void *aux;
46 };
47
48 /* Implementation of a casewindow. */
49 struct casewindow_class
50 {
51 void *(*create) (struct taint *, const struct caseproto *);
52 void (*destroy) (void *aux);
53 void (*push_head) (void *aux, struct ccase *);
54 void (*pop_tail) (void *aux, casenumber cnt);
55 struct ccase *(*get_case) (void *aux, casenumber ofs);
56 casenumber (*get_case_cnt) (const void *aux);
57 };
58
59 /* Classes. */
60 static const struct casewindow_class casewindow_memory_class;
61 static const struct casewindow_class casewindow_file_class;
62
63 /* Creates and returns a new casewindow using the given
64 parameters. */
65 static struct casewindow *
do_casewindow_create(struct taint * taint,const struct caseproto * proto,casenumber max_in_core_cases)66 do_casewindow_create (struct taint *taint, const struct caseproto *proto,
67 casenumber max_in_core_cases)
68 {
69 struct casewindow *cw = xmalloc (sizeof *cw);
70 cw->class = (max_in_core_cases
71 ? &casewindow_memory_class
72 : &casewindow_file_class);
73 cw->aux = cw->class->create (taint, proto);
74 cw->proto = caseproto_ref (proto);
75 cw->max_in_core_cases = max_in_core_cases;
76 cw->taint = taint;
77 return cw;
78 }
79
80 /* Creates and returns a new casewindow for cases that take the
81 form specified by PROTO. If the casewindow holds more than
82 MAX_IN_CORE_CASES cases at any time, its cases will be dumped
83 to disk; otherwise, its cases will be held in memory.
84
85 The caller retains its reference to PROTO. */
86 struct casewindow *
casewindow_create(const struct caseproto * proto,casenumber max_in_core_cases)87 casewindow_create (const struct caseproto *proto, casenumber max_in_core_cases)
88 {
89 return do_casewindow_create (taint_create (), proto, max_in_core_cases);
90 }
91
92 /* Destroys casewindow CW.
93 Returns true if CW was tainted, which is caused by an I/O
94 error or by taint propagation to the casewindow. */
95 bool
casewindow_destroy(struct casewindow * cw)96 casewindow_destroy (struct casewindow *cw)
97 {
98 bool ok = true;
99 if (cw != NULL)
100 {
101 cw->class->destroy (cw->aux);
102 ok = taint_destroy (cw->taint);
103 caseproto_unref (cw->proto);
104 free (cw);
105 }
106 return ok;
107 }
108
109 /* Swaps the contents of casewindows A and B. */
110 static void
casewindow_swap(struct casewindow * a,struct casewindow * b)111 casewindow_swap (struct casewindow *a, struct casewindow *b)
112 {
113 struct casewindow tmp = *a;
114 *a = *b;
115 *b = tmp;
116 }
117
118 /* Dumps the contents of casewindow OLD to disk. */
119 static void
casewindow_to_disk(struct casewindow * old)120 casewindow_to_disk (struct casewindow *old)
121 {
122 struct casewindow *new;
123 new = do_casewindow_create (taint_clone (old->taint), old->proto, 0);
124 while (casewindow_get_case_cnt (old) > 0 && !casewindow_error (new))
125 {
126 struct ccase *c = casewindow_get_case (old, 0);
127 if (c == NULL)
128 break;
129 casewindow_pop_tail (old, 1);
130 casewindow_push_head (new, c);
131 }
132 casewindow_swap (old, new);
133 casewindow_destroy (new);
134 }
135
136 /* Pushes case C at the head of casewindow CW.
137 Case C becomes owned by the casewindow. */
138 void
casewindow_push_head(struct casewindow * cw,struct ccase * c)139 casewindow_push_head (struct casewindow *cw, struct ccase *c)
140 {
141 if (!casewindow_error (cw))
142 {
143 cw->class->push_head (cw->aux, c);
144 if (!casewindow_error (cw))
145 {
146 casenumber case_cnt = cw->class->get_case_cnt (cw->aux);
147 if (case_cnt > cw->max_in_core_cases
148 && cw->class == &casewindow_memory_class)
149 casewindow_to_disk (cw);
150 }
151 }
152 else
153 case_unref (c);
154 }
155
156 /* Deletes CASE_CNT cases at the tail of casewindow CW. */
157 void
casewindow_pop_tail(struct casewindow * cw,casenumber case_cnt)158 casewindow_pop_tail (struct casewindow *cw, casenumber case_cnt)
159 {
160 if (!casewindow_error (cw))
161 cw->class->pop_tail (cw->aux, case_cnt);
162 }
163
164 /* Returns the case that is CASE_IDX cases away from CW's tail
165 into C, or a null pointer on an I/O error or if CW is
166 otherwise tainted. The caller must call case_unref() on the
167 returned case when it is no longer needed. */
168 struct ccase *
casewindow_get_case(const struct casewindow * cw_,casenumber case_idx)169 casewindow_get_case (const struct casewindow *cw_, casenumber case_idx)
170 {
171 struct casewindow *cw = CONST_CAST (struct casewindow *, cw_);
172
173 assert (case_idx >= 0 && case_idx < casewindow_get_case_cnt (cw));
174 if (casewindow_error (cw))
175 return NULL;
176 return cw->class->get_case (cw->aux, case_idx);
177 }
178
179 /* Returns the number of cases in casewindow CW. */
180 casenumber
casewindow_get_case_cnt(const struct casewindow * cw)181 casewindow_get_case_cnt (const struct casewindow *cw)
182 {
183 return cw->class->get_case_cnt (cw->aux);
184 }
185
186 /* Returns the case prototype for the cases in casewindow CW.
187 The caller must not unref the returned prototype. */
188 const struct caseproto *
casewindow_get_proto(const struct casewindow * cw)189 casewindow_get_proto (const struct casewindow *cw)
190 {
191 return cw->proto;
192 }
193
194 /* Returns true if casewindow CW is tainted.
195 A casewindow is tainted by an I/O error or by taint
196 propagation to the casewindow. */
197 bool
casewindow_error(const struct casewindow * cw)198 casewindow_error (const struct casewindow *cw)
199 {
200 return taint_is_tainted (cw->taint);
201 }
202
203 /* Marks casewindow CW tainted. */
204 void
casewindow_force_error(struct casewindow * cw)205 casewindow_force_error (struct casewindow *cw)
206 {
207 taint_set_taint (cw->taint);
208 }
209
210 /* Returns casewindow CW's taint object. */
211 const struct taint *
casewindow_get_taint(const struct casewindow * cw)212 casewindow_get_taint (const struct casewindow *cw)
213 {
214 return cw->taint;
215 }
216
217 /* In-memory casewindow data. */
218 struct casewindow_memory
219 {
220 struct deque deque;
221 struct ccase **cases;
222 };
223
224 static void *
casewindow_memory_create(struct taint * taint UNUSED,const struct caseproto * proto UNUSED)225 casewindow_memory_create (struct taint *taint UNUSED,
226 const struct caseproto *proto UNUSED)
227 {
228 struct casewindow_memory *cwm = xmalloc (sizeof *cwm);
229 cwm->cases = deque_init (&cwm->deque, 4, sizeof *cwm->cases);
230 return cwm;
231 }
232
233 static void
casewindow_memory_destroy(void * cwm_)234 casewindow_memory_destroy (void *cwm_)
235 {
236 struct casewindow_memory *cwm = cwm_;
237 while (!deque_is_empty (&cwm->deque))
238 case_unref (cwm->cases[deque_pop_front (&cwm->deque)]);
239 free (cwm->cases);
240 free (cwm);
241 }
242
243 static void
casewindow_memory_push_head(void * cwm_,struct ccase * c)244 casewindow_memory_push_head (void *cwm_, struct ccase *c)
245 {
246 struct casewindow_memory *cwm = cwm_;
247 if (deque_is_full (&cwm->deque))
248 cwm->cases = deque_expand (&cwm->deque, cwm->cases, sizeof *cwm->cases);
249 cwm->cases[deque_push_back (&cwm->deque)] = c;
250 }
251
252 static void
casewindow_memory_pop_tail(void * cwm_,casenumber case_cnt)253 casewindow_memory_pop_tail (void *cwm_, casenumber case_cnt)
254 {
255 struct casewindow_memory *cwm = cwm_;
256 assert (deque_count (&cwm->deque) >= case_cnt);
257 while (case_cnt-- > 0)
258 case_unref (cwm->cases[deque_pop_front (&cwm->deque)]);
259 }
260
261 static struct ccase *
casewindow_memory_get_case(void * cwm_,casenumber ofs)262 casewindow_memory_get_case (void *cwm_, casenumber ofs)
263 {
264 struct casewindow_memory *cwm = cwm_;
265 return case_ref (cwm->cases[deque_front (&cwm->deque, ofs)]);
266 }
267
268 static casenumber
casewindow_memory_get_case_cnt(const void * cwm_)269 casewindow_memory_get_case_cnt (const void *cwm_)
270 {
271 const struct casewindow_memory *cwm = cwm_;
272 return deque_count (&cwm->deque);
273 }
274
275 static const struct casewindow_class casewindow_memory_class =
276 {
277 casewindow_memory_create,
278 casewindow_memory_destroy,
279 casewindow_memory_push_head,
280 casewindow_memory_pop_tail,
281 casewindow_memory_get_case,
282 casewindow_memory_get_case_cnt,
283 };
284
285 /* On-disk casewindow data. */
286 struct casewindow_file
287 {
288 struct case_tmpfile *file;
289 casenumber head, tail;
290 };
291
292 static void *
casewindow_file_create(struct taint * taint,const struct caseproto * proto)293 casewindow_file_create (struct taint *taint, const struct caseproto *proto)
294 {
295 struct casewindow_file *cwf = xmalloc (sizeof *cwf);
296 cwf->file = case_tmpfile_create (proto);
297 cwf->head = cwf->tail = 0;
298 taint_propagate (case_tmpfile_get_taint (cwf->file), taint);
299 return cwf;
300 }
301
302 static void
casewindow_file_destroy(void * cwf_)303 casewindow_file_destroy (void *cwf_)
304 {
305 struct casewindow_file *cwf = cwf_;
306 case_tmpfile_destroy (cwf->file);
307 free (cwf);
308 }
309
310 static void
casewindow_file_push_head(void * cwf_,struct ccase * c)311 casewindow_file_push_head (void *cwf_, struct ccase *c)
312 {
313 struct casewindow_file *cwf = cwf_;
314 if (case_tmpfile_put_case (cwf->file, cwf->head, c))
315 cwf->head++;
316 }
317
318 static void
casewindow_file_pop_tail(void * cwf_,casenumber cnt)319 casewindow_file_pop_tail (void *cwf_, casenumber cnt)
320 {
321 struct casewindow_file *cwf = cwf_;
322 assert (cnt <= cwf->head - cwf->tail);
323 cwf->tail += cnt;
324 if (cwf->head == cwf->tail)
325 cwf->head = cwf->tail = 0;
326 }
327
328 static struct ccase *
casewindow_file_get_case(void * cwf_,casenumber ofs)329 casewindow_file_get_case (void *cwf_, casenumber ofs)
330 {
331 struct casewindow_file *cwf = cwf_;
332 return case_tmpfile_get_case (cwf->file, cwf->tail + ofs);
333 }
334
335 static casenumber
casewindow_file_get_case_cnt(const void * cwf_)336 casewindow_file_get_case_cnt (const void *cwf_)
337 {
338 const struct casewindow_file *cwf = cwf_;
339 return cwf->head - cwf->tail;
340 }
341
342 static const struct casewindow_class casewindow_file_class =
343 {
344 casewindow_file_create,
345 casewindow_file_destroy,
346 casewindow_file_push_head,
347 casewindow_file_pop_tail,
348 casewindow_file_get_case,
349 casewindow_file_get_case_cnt,
350 };
351