1 /* Part of SWI-Prolog
2
3 Author: Jan Wielemaker
4 E-mail: J.Wielemaker@vu.nl
5 WWW: http://www.swi-prolog.org
6 Copyright (c) 2011-2017, VU University Amsterdam
7 All rights reserved.
8
9 Redistribution and use in source and binary forms, with or without
10 modification, are permitted provided that the following conditions
11 are met:
12
13 1. Redistributions of source code must retain the above copyright
14 notice, this list of conditions and the following disclaimer.
15
16 2. Redistributions in binary form must reproduce the above copyright
17 notice, this list of conditions and the following disclaimer in
18 the documentation and/or other materials provided with the
19 distribution.
20
21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 POSSIBILITY OF SUCH DAMAGE.
33 */
34
35 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
36 RDF-DB query management. This module keeps track of running queries. We
37 need this for GC purposes. In particular, we need to:
38
39 * Find the oldest active generation.
40 * Get a signal if all currently active queries have finished.
41
42 In addition to queries, this module performs the necessary logic on
43 generations:
44
45 * Is an object visible in a query?
46 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
47
48 #ifdef HAVE_CONFIG_H
49 #include <config.h>
50 #endif
51 #include <SWI-Stream.h>
52 #include <SWI-Prolog.h>
53 #include <string.h>
54 #include <assert.h>
55 #include "rdf_db.h"
56 #include "query.h"
57 #include "memory.h"
58 #include "mutex.h"
59 #include "buffer.h"
60
61 static void init_query_stack(rdf_db *db, query_stack *qs);
62
63
64 /*******************************
65 * THREAD DATA *
66 *******************************/
67
68 /* Return the per-thread data for the given DB. This version uses no
69 locks for the common case that the required datastructures are
70 already provided.
71 */
72
73 thread_info *
rdf_thread_info(rdf_db * db,int tid)74 rdf_thread_info(rdf_db *db, int tid)
75 { query_admin *qa = &db->queries;
76 per_thread *td = &qa->query.per_thread;
77 thread_info *ti;
78 size_t idx = MSB(tid);
79
80 if ( !td->blocks[idx] )
81 { simpleMutexLock(&qa->query.lock);
82 if ( !td->blocks[idx] )
83 { size_t bs = BLOCKLEN(idx);
84 thread_info **newblock = rdf_malloc(db, bs*sizeof(thread_info*));
85
86 memset(newblock, 0, bs*sizeof(thread_info*));
87
88 td->blocks[idx] = newblock-bs;
89 }
90 simpleMutexUnlock(&qa->query.lock);
91 }
92
93 if ( !(ti=td->blocks[idx][tid]) )
94 { simpleMutexLock(&qa->query.lock);
95 if ( !(ti=td->blocks[idx][tid]) )
96 { ti = rdf_malloc(db, sizeof(*ti));
97 memset(ti, 0, sizeof(*ti));
98 init_query_stack(db, &ti->queries);
99 MEMORY_BARRIER();
100 td->blocks[idx][tid] = ti;
101 if ( tid > qa->query.thread_max )
102 qa->query.thread_max = tid;
103 }
104 simpleMutexUnlock(&qa->query.lock);
105 }
106
107 return ti;
108 }
109
110
111 gen_t
oldest_query_geneneration(rdf_db * db,gen_t * reindex_gen)112 oldest_query_geneneration(rdf_db *db, gen_t *reindex_gen)
113 { int tid;
114 gen_t gen = db->snapshots.keep;
115 gen_t ren = GEN_MAX;
116 query_admin *qa = &db->queries;
117 per_thread *td = &qa->query.per_thread;
118
119 DEBUG(20,
120 if ( db->snapshots.keep != GEN_MAX )
121 { char buf[64];
122 Sdprintf("Oldest snapshot gen = %s\n",
123 gen_name(db->snapshots.keep, buf));
124 });
125
126 for(tid=1; tid <= qa->query.thread_max; tid++)
127 { thread_info **tis;
128 thread_info *ti;
129
130 if ( (tis=td->blocks[MSB(tid)]) &&
131 (ti=tis[tid]) )
132 { query_stack *qs = &ti->queries;
133
134 if ( qs->top > 0 )
135 { query *q = &qs->preallocated[0];
136
137 DEBUG(10,
138 { char buf[20];
139 Sdprintf("Thread %d: %d queries; oldest gen %s\n",
140 tid, qs->top, gen_name(q->rd_gen, buf));
141 });
142
143 if ( q->rd_gen < gen )
144 gen = q->rd_gen;
145 if ( q->reindex_gen < ren )
146 ren = q->reindex_gen;
147 } else
148 { DEBUG(11, Sdprintf("Thread %d: no queries\n", tid));
149 }
150 }
151 }
152
153 if ( reindex_gen )
154 *reindex_gen = ren;
155
156 return gen;
157 }
158
159
160
161 /*******************************
162 * QUERY-STACK *
163 *******************************/
164
165 static void
preinit_query(rdf_db * db,query_stack * qs,query * q,query * parent,int depth)166 preinit_query(rdf_db *db, query_stack *qs, query *q, query *parent, int depth)
167 { q->db = db;
168 q->stack = qs;
169 q->parent = q;
170 q->depth = depth;
171 }
172
173
174 static void
init_query_stack(rdf_db * db,query_stack * qs)175 init_query_stack(rdf_db *db, query_stack *qs)
176 { int tid = PL_thread_self();
177 int i;
178 int prealloc = sizeof(qs->preallocated)/sizeof(qs->preallocated[0]);
179 query *parent = NULL;
180
181 memset(qs, 0, sizeof(*qs));
182
183 simpleMutexInit(&qs->lock);
184 qs->db = db;
185 qs->tr_gen_base = GEN_TBASE + tid*GEN_TNEST;
186 qs->tr_gen_max = qs->tr_gen_base + (GEN_TNEST-1);
187
188 for(i=0; i<MSB(prealloc); i++)
189 qs->blocks[i] = qs->preallocated;
190 for(i=0; i<prealloc; i++)
191 { query *q = &qs->preallocated[i];
192
193 preinit_query(db, qs, q, parent, i);
194 parent = q;
195 }
196 }
197
198
199 static query *
alloc_query(query_stack * qs)200 alloc_query(query_stack *qs)
201 { int depth = qs->top;
202 int b = MSB(depth);
203
204 if ( b >= MAX_QBLOCKS )
205 { PL_resource_error("open_rdf_queries");
206 return NULL;
207 }
208
209 if ( qs->blocks[b] )
210 { query *q = &qs->blocks[b][depth];
211
212 assert(q->stack);
213
214 return q;
215 }
216
217 simpleMutexLock(&qs->lock);
218 if ( !qs->blocks[b] )
219 { size_t bytes = BLOCKLEN(b) * sizeof(query);
220 query *ql = rdf_malloc(qs->db, bytes);
221 query *parent;
222 int i;
223
224 if ( !ql )
225 { simpleMutexUnlock(&qs->lock);
226 PL_resource_error("memory");
227 return NULL;
228 }
229
230 memset(ql, 0, bytes);
231 ql -= depth; /* rebase */
232 parent = &qs->blocks[b-1][depth-1];
233 for(i=depth; i<depth*2; i++)
234 { query *q = &ql[i];
235 preinit_query(qs->db, qs, q, parent, i);
236 parent = q;
237 }
238 MEMORY_BARRIER();
239 qs->blocks[b] = ql;
240 }
241 simpleMutexUnlock(&qs->lock);
242
243 return &qs->blocks[b][depth];
244 }
245
246
247 static void
push_query(rdf_db * db,query * q)248 push_query(rdf_db *db, query *q)
249 { enter_scan(&db->defer_all);
250 q->stack->top++;
251 }
252
253
254 static void
pop_query(rdf_db * db,query * q)255 pop_query(rdf_db *db, query *q)
256 { q->stack->top--;
257 exit_scan(&db->defer_all);
258 }
259
260
261 query *
open_query(rdf_db * db)262 open_query(rdf_db *db)
263 { int tid = PL_thread_self();
264 thread_info *ti = rdf_thread_info(db, tid);
265 query *q = alloc_query(&ti->queries);
266
267 if ( !q ) return NULL;
268 q->type = Q_NORMAL;
269 q->transaction = ti->queries.transaction;
270 q->reindex_gen = db->reindexed;
271 if ( q->transaction ) /* Query inside a transaction */
272 { q->rd_gen = q->transaction->rd_gen;
273 q->tr_gen = q->transaction->wr_gen;
274 q->wr_gen = q->transaction->wr_gen;
275 } else
276 { q->rd_gen = db->queries.generation;
277 q->tr_gen = GEN_TBASE;
278 q->wr_gen = GEN_UNDEF;
279 }
280
281 push_query(db, q);
282
283 return q;
284 }
285
286
287 query *
open_transaction(rdf_db * db,triple_buffer * added,triple_buffer * deleted,triple_buffer * updated,snapshot * ss)288 open_transaction(rdf_db *db,
289 triple_buffer *added,
290 triple_buffer *deleted,
291 triple_buffer *updated,
292 snapshot *ss)
293 { int tid = PL_thread_self();
294 thread_info *ti = rdf_thread_info(db, tid);
295 query *q = alloc_query(&ti->queries);
296
297 if ( !q ) return NULL;
298 q->type = Q_TRANSACTION;
299 q->transaction = ti->queries.transaction;
300 q->reindex_gen = GEN_MAX; /* should not get this down */
301
302 if ( ss && ss != SNAPSHOT_ANONYMOUS )
303 { int ss_tid = snapshot_thread(ss);
304 assert(!ss_tid || ss_tid == tid);
305 (void)ss_tid;
306
307 q->rd_gen = ss->rd_gen;
308 q->tr_gen = ss->tr_gen;
309 } else if ( q->transaction ) /* nested transaction */
310 { q->rd_gen = q->transaction->rd_gen;
311 q->tr_gen = q->transaction->wr_gen;
312 } else
313 { q->rd_gen = db->queries.generation;
314 q->tr_gen = ti->queries.tr_gen_base;
315 }
316
317 q->wr_gen = q->tr_gen;
318 ti->queries.transaction = q;
319
320 init_triple_buffer(added);
321 init_triple_buffer(deleted);
322 init_triple_buffer(updated);
323 q->transaction_data.added = added;
324 q->transaction_data.deleted = deleted;
325 q->transaction_data.updated = updated;
326
327 push_query(db, q);
328
329 return q;
330 }
331
332
333 void
close_query(query * q)334 close_query(query *q)
335 { pop_query(q->db, q);
336 }
337
338
339 int
empty_transaction(query * q)340 empty_transaction(query *q)
341 { return ( is_empty_buffer(q->transaction_data.added) &&
342 is_empty_buffer(q->transaction_data.deleted) &&
343 is_empty_buffer(q->transaction_data.updated) );
344 }
345
346
347 /*******************************
348 * ADMIN *
349 *******************************/
350
351 void
init_query_admin(rdf_db * db)352 init_query_admin(rdf_db *db)
353 { query_admin *qa = &db->queries;
354
355 memset(qa, 0, sizeof(*qa));
356 simpleMutexInit(&qa->query.lock);
357 simpleMutexInit(&qa->write.lock);
358 simpleMutexInit(&qa->write.generation_lock);
359 }
360
361
362 /*******************************
363 * GENERATIONS *
364 *******************************/
365
366 /* lifespan() is true if a lifespan is visible inside a query.
367
368 A lifespan is alive if the query generation is inside it,
369 but with transactions there are two problems:
370
371 - If the triple is deleted by a parent transaction it is dead
372 - If the triple is created by a parent transaction it is alive
373 */
374
375 char *
gen_name(gen_t gen,char * buf)376 gen_name(gen_t gen, char *buf)
377 { static char tmp[24];
378
379 if ( !buf )
380 buf = tmp;
381 if ( gen == GEN_UNDEF ) return "GEN_UNDEF";
382 if ( gen == GEN_MAX ) return "GEN_MAX";
383 if ( gen == GEN_PREHIST ) return "GEN_PREHIST";
384 if ( gen >= GEN_TBASE )
385 { int tid = (gen-GEN_TBASE)/GEN_TNEST;
386 gen_t r = (gen-GEN_TBASE)%GEN_TNEST;
387
388 if ( r == GEN_TNEST-1 )
389 Ssprintf(buf, "T%d+GEN_TNEST", tid);
390 else
391 Ssprintf(buf, "T%d+%lld", tid, (int64_t)r);
392 return buf;
393 }
394 Ssprintf(buf, "%lld", (int64_t)gen);
395 return buf;
396 }
397
398
399
400 int
alive_lifespan(query * q,lifespan * lifespan)401 alive_lifespan(query *q, lifespan *lifespan)
402 { DEBUG(2,
403 { char b[4][24];
404
405 Sdprintf("q: rd_gen=%s; tr_gen=%s; t: %s..%s\n",
406 gen_name(q->rd_gen, b[0]),
407 gen_name(q->tr_gen, b[1]),
408 gen_name(lifespan->born, b[2]),
409 gen_name(lifespan->died, b[3]));
410 });
411
412 if ( q->rd_gen >= lifespan->born &&
413 q->rd_gen < lifespan->died )
414 { if ( is_wr_transaction_gen(q, lifespan->died) &&
415 q->tr_gen >= lifespan->died )
416 return FALSE;
417
418 return TRUE;
419 } else /* created/died in transaction */
420 { if ( is_wr_transaction_gen(q, lifespan->born) )
421 { if ( q->tr_gen >= lifespan->born &&
422 q->tr_gen < lifespan->died )
423 return TRUE;
424 }
425 }
426
427 return FALSE;
428 }
429
430
431 int
born_lifespan(query * q,lifespan * lifespan)432 born_lifespan(query *q, lifespan *lifespan)
433 { if ( q->rd_gen >= lifespan->born )
434 return TRUE;
435
436 if ( is_wr_transaction_gen(q, lifespan->born) &&
437 q->tr_gen >= lifespan->born )
438 return TRUE;
439
440 return FALSE;
441 }
442
443
444 /*******************************
445 * TRIPLE MANIPULATION *
446 *******************************/
447
448 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
449 We have three basic triple manipulations:
450
451 - Add triples
452 - Delete triples
453 - Updated triples (expressed as deleting and adding)
454
455 add_triples() adds an array of triples to the database, stepping the
456 database generation by 1. Calls to add triples must be synchronized with
457 other addition calls, but not with read nor delete operations. This
458 synchronization is needed because without we cannot set the generation
459 for new queries to a proper value.
460
461 To reduce the locked time, we perform this in multiple steps:
462
463 - prelink_triple() performs tasks that do not affect the remainder of
464 the database.
465 - In the link-phase, we add the triples in packages of ADD_CHUNK_SIZE
466 to the database, but addressed in the far future. No reader sees
467 see what we are doing.
468 - Next, we grab the generation_lock and update the triples to
469 the next generation and increment the generation to make them
470 visible.
471 - Finally, we do some post-hock work to update statistics.
472 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
473
474 #define ADD_CHUNK_SIZE 50
475
476 int
add_triples(query * q,triple ** triples,size_t count)477 add_triples(query *q, triple **triples, size_t count)
478 { rdf_db *db = q->db;
479 gen_t gen, gen_max;
480 triple **ep = triples+count;
481 triple **tp;
482
483 /* pre-lock phase */
484 for(tp=triples; tp < ep; tp++)
485 prelink_triple(db, *tp, q);
486 consider_triple_rehash(db, count);
487
488 /* Add the triples in the future */
489 gen_max = query_max_gen(q);
490 for(tp=triples; tp < ep; )
491 { triple **echunk = tp+50;
492
493 if ( echunk > ep )
494 echunk = ep;
495
496 simpleMutexLock(&db->queries.write.lock);
497 for(; tp<echunk; tp++)
498 { triple *t = *tp;
499
500 t->lifespan.born = gen_max;
501 t->lifespan.died = gen_max;
502 link_triple(db, t, q);
503 }
504 simpleMutexUnlock(&db->queries.write.lock);
505 }
506
507 /* generation update */
508 simpleMutexLock(&db->queries.write.generation_lock);
509 gen = queryWriteGen(q)+1;
510 for(tp=triples; tp < ep; tp++)
511 { triple *t = *tp;
512
513 t->lifespan.born = gen;
514 }
515 setWriteGen(q, gen);
516 simpleMutexUnlock(&db->queries.write.generation_lock);
517
518 if ( q->transaction )
519 { for(tp=triples; tp < ep; tp++)
520 { triple *t = *tp;
521
522 postlink_triple(db, t, q);
523 buffer_triple(q->transaction->transaction_data.added, t);
524 }
525 } else
526 { for(tp=triples; tp < ep; tp++)
527 { triple *t = *tp;
528
529 postlink_triple(db, t, q);
530 }
531
532 if ( rdf_is_broadcasting(EV_ASSERT|EV_ASSERT_LOAD) )
533 { for(tp=triples; tp < ep; tp++)
534 { triple *t = *tp;
535 broadcast_id id = t->loaded ? EV_ASSERT_LOAD : EV_ASSERT;
536
537 if ( !rdf_broadcast(id, t, NULL) )
538 return FALSE;
539 }
540 }
541 }
542
543 return TRUE;
544 }
545
546
547 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
548 del_triples() deletes triples from the database. There are two actions:
549
550 - del_triple_consequences() deletes (entailment) consequences of
551 erasing the triple. Currently this is handling subPropertyOf
552 entailment. This doesn't remove the triple, but merely invalidates
553 the subPropertyOf reachability matrix for subsequent generations.
554 - erase_triple() is called on the final commit and updates statistics.
555 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
556
557 int
del_triples(query * q,triple ** triples,size_t count)558 del_triples(query *q, triple **triples, size_t count)
559 { rdf_db *db = q->db;
560 gen_t gen;
561 triple **ep = triples+count;
562 triple **tp;
563
564 if ( count == 0 )
565 return TRUE;
566 else
567 rdf_create_gc_thread(db);
568
569 simpleMutexLock(&db->queries.write.generation_lock);
570 simpleMutexLock(&db->queries.write.lock);
571 gen = queryWriteGen(q) + 1;
572
573 for(tp=triples; tp < ep; tp++)
574 { triple *t = deref_triple(db, *tp);
575
576 t->lifespan.died = gen;
577 del_triple_consequences(db, t, q);
578
579 if ( q->transaction )
580 buffer_triple(q->transaction->transaction_data.deleted, t);
581 else
582 erase_triple(db, t, q);
583 }
584
585 setWriteGen(q, gen);
586 simpleMutexUnlock(&db->queries.write.lock);
587 simpleMutexUnlock(&db->queries.write.generation_lock);
588
589 if ( !q->transaction && rdf_is_broadcasting(EV_RETRACT) )
590 { for(tp=triples; tp < ep; tp++)
591 { triple *t = deref_triple(db, *tp);
592
593 if ( !rdf_broadcast(EV_RETRACT, t, NULL) )
594 return FALSE;
595 }
596 }
597
598 return TRUE;
599 }
600
601
602 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
603 update_triples() updates an array of triples.
604 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
605
606 int
update_triples(query * q,triple ** old,triple ** new,size_t count)607 update_triples(query *q,
608 triple **old, triple **new,
609 size_t count)
610 { rdf_db *db = q->db;
611 gen_t gen, gen_max;
612 triple **eo = old+count;
613 triple **en = new+count;
614 triple **to, **tn;
615 size_t updated = 0;
616
617 if ( count == 0 )
618 return TRUE;
619 else
620 rdf_create_gc_thread(db);
621
622 for(tn=new; tn < en; tn++)
623 { triple *t = *tn;
624
625 if ( t )
626 prelink_triple(db, t, q);
627 }
628
629 simpleMutexLock(&db->queries.write.generation_lock);
630 simpleMutexLock(&db->queries.write.lock);
631 gen = queryWriteGen(q) + 1;
632 gen_max = query_max_gen(q);
633
634 for(to=old,tn=new; to < eo; to++,tn++)
635 { if ( *tn )
636 { triple *n = *tn; /* new, cannot be reindexed */
637 triple *o = deref_triple(db, *to);
638
639 o->lifespan.died = gen;
640 n->lifespan.born = gen;
641 n->lifespan.died = gen_max;
642 link_triple(db, *tn, q);
643 del_triple_consequences(db, o, q);
644 if ( q->transaction )
645 { buffer_triple(q->transaction->transaction_data.updated, *to);
646 buffer_triple(q->transaction->transaction_data.updated, *tn);
647 } else
648 { erase_triple(db, *to, q);
649 }
650
651 updated++;
652 }
653 }
654
655 setWriteGen(q, gen);
656 simpleMutexUnlock(&db->queries.write.lock);
657 simpleMutexUnlock(&db->queries.write.generation_lock);
658
659 consider_triple_rehash(db, 1);
660
661 if ( !q->transaction && rdf_is_broadcasting(EV_UPDATE) )
662 { for(to=old,tn=new; to < eo; to++,tn++)
663 { triple *t = *tn;
664
665 if ( t )
666 { postlink_triple(db, *tn, q);
667
668 if ( !rdf_broadcast(EV_UPDATE, *to, *tn) )
669 return FALSE;
670 }
671 }
672 } else
673 { for(tn=new; tn < en; tn++)
674 { triple *t = *tn;
675
676 if ( t )
677 postlink_triple(db, t, q);
678 }
679 }
680
681 return TRUE;
682 }
683
684
685 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
686 Matrices used in a transaction must be discarded because a new
687 transaction will use the same generation numbers, but typically for
688 different modifications.
689
690 TBD: Hand some statistics to GC, such that we know that there are
691 matrices to collect.
692 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
693
694 static void
invalidate_lifespans_transaction(query * q)695 invalidate_lifespans_transaction(query *q)
696 { cell *c, *next;
697
698 for(c=q->transaction_data.lifespans.head; c; c=next)
699 { lifespan *span = c->value;
700
701 next = c->next;
702 span->died = GEN_PREHIST;
703 rdf_free(q->db, c, sizeof(*c));
704 }
705
706 q->transaction_data.lifespans.head = NULL;
707 q->transaction_data.lifespans.tail = NULL;
708 }
709
710
711 void
close_transaction(query * q)712 close_transaction(query *q)
713 { assert(q->type == Q_TRANSACTION);
714
715 free_triple_buffer(q->transaction_data.added);
716 free_triple_buffer(q->transaction_data.deleted);
717 free_triple_buffer(q->transaction_data.updated);
718 invalidate_lifespans_transaction(q);
719
720 q->stack->transaction = q->transaction;
721
722 close_query(q);
723 }
724
725
726 static void
commit_add(query * q,gen_t gen_max,gen_t gen,triple * t)727 commit_add(query *q, gen_t gen_max, gen_t gen, triple *t)
728 { t = deref_triple(q->db, t);
729
730 if ( t->lifespan.died == gen_max )
731 { t->lifespan.born = gen;
732 add_triple_consequences(q->db, t, q);
733 if ( q->transaction )
734 buffer_triple(q->transaction->transaction_data.added, t);
735 else
736 t->lifespan.died = GEN_MAX;
737 }
738 }
739
740
741 static void
commit_del(query * q,gen_t gen,triple * t)742 commit_del(query *q, gen_t gen, triple *t)
743 { t = deref_triple(q->db, t);
744
745 if ( is_wr_transaction_gen(q, t->lifespan.died) )
746 { t->lifespan.died = gen;
747 if ( q->transaction )
748 { del_triple_consequences(q->db, t, q);
749 buffer_triple(q->transaction->transaction_data.deleted, t);
750 } else
751 { erase_triple(q->db, t, q);
752 }
753 }
754 }
755
756
757 int
commit_transaction(query * q)758 commit_transaction(query *q)
759 { rdf_db *db = q->db;
760 triple **tp;
761 gen_t gen, gen_max;
762
763 simpleMutexLock(&db->queries.write.generation_lock);
764 simpleMutexLock(&db->queries.write.lock);
765 gen = queryWriteGen(q) + 1;
766 gen_max = transaction_max_gen(q);
767 /* added triples */
768 for(tp=q->transaction_data.added->base;
769 tp<q->transaction_data.added->top;
770 tp++)
771 { commit_add(q, gen_max, gen, *tp);
772 }
773 /* deleted triples */
774 for(tp=q->transaction_data.deleted->base;
775 tp<q->transaction_data.deleted->top;
776 tp++)
777 { commit_del(q, gen, *tp);
778 }
779
780 /* updated triples */
781 for(tp=q->transaction_data.updated->base;
782 tp<q->transaction_data.updated->top;
783 tp += 2)
784 { triple *to = tp[0];
785 triple *tn = tp[1];
786
787 commit_del(q, gen, to);
788 commit_add(q, gen_max, gen, tn);
789 }
790
791 setWriteGen(q, gen);
792 simpleMutexUnlock(&db->queries.write.lock);
793 simpleMutexUnlock(&db->queries.write.generation_lock);
794
795 q->stack->transaction = q->transaction; /* do not nest monitor calls */
796 /* inside the transaction */
797
798 /* Broadcast new triples */
799 if ( !q->transaction )
800 { if ( rdf_is_broadcasting(EV_RETRACT) )
801 { for(tp=q->transaction_data.deleted->base;
802 tp<q->transaction_data.deleted->top;
803 tp++)
804 { triple *t = *tp;
805
806 if ( t->lifespan.died == gen )
807 { if ( !rdf_broadcast(EV_RETRACT, t, NULL) )
808 return FALSE;
809 }
810 }
811 }
812
813 if ( rdf_is_broadcasting(EV_ASSERT|EV_ASSERT_LOAD) )
814 { for(tp=q->transaction_data.added->base;
815 tp<q->transaction_data.added->top;
816 tp++)
817 { triple *t = *tp;
818
819 if ( t->lifespan.born == gen )
820 { broadcast_id id = t->loaded ? EV_ASSERT_LOAD : EV_ASSERT;
821
822 if ( !rdf_broadcast(id, t, NULL) )
823 return FALSE;
824 }
825 }
826 }
827
828 if ( rdf_is_broadcasting(EV_UPDATE) )
829 { for(tp=q->transaction_data.updated->base;
830 tp<q->transaction_data.updated->top;
831 tp += 2)
832 { triple *to = tp[0];
833 triple *tn = tp[1];
834
835 if ( to->lifespan.died == gen &&
836 tn->lifespan.born == gen )
837 { if ( !rdf_broadcast(EV_UPDATE, to, tn) )
838 return FALSE;
839 }
840 }
841 }
842 }
843
844 close_transaction(q);
845
846 return TRUE;
847 }
848
849
850 /* TBD: What if someone else deleted this triple too? We can check
851 that by discovering multiple changes to the died generation.
852 */
853
854 int
discard_transaction(query * q)855 discard_transaction(query *q)
856 { rdf_db *db = q->db;
857 triple **tp;
858 gen_t gen_max = transaction_max_gen(q);
859
860 for(tp=q->transaction_data.added->base;
861 tp<q->transaction_data.added->top;
862 tp++)
863 { triple *t = *tp;
864
865 /* revert creation of new */
866 if ( is_wr_transaction_gen(q, t->lifespan.born) )
867 { t = deref_triple(db, t);
868 t->lifespan.died = GEN_PREHIST;
869 erase_triple(db, t, q);
870 }
871 }
872
873 for(tp=q->transaction_data.deleted->base;
874 tp<q->transaction_data.deleted->top;
875 tp++)
876 { triple *t = *tp;
877
878 /* revert deletion of old */
879 if ( is_wr_transaction_gen(q, t->lifespan.died) )
880 { t = deref_triple(db, t);
881
882 t->lifespan.died = gen_max;
883 }
884 }
885
886 for(tp=q->transaction_data.updated->base;
887 tp<q->transaction_data.updated->top;
888 tp += 2)
889 { triple *to = tp[0];
890 triple *tn = tp[1];
891
892 /* revert deletion of old */
893 if ( is_wr_transaction_gen(q, to->lifespan.died) )
894 { to = deref_triple(db, to);
895
896 to->lifespan.died = gen_max;
897 }
898 /* revert creation of new */
899 if ( is_wr_transaction_gen(q, tn->lifespan.born) &&
900 tn->lifespan.died == gen_max )
901 { tn = deref_triple(db, tn);
902 tn->lifespan.died = GEN_PREHIST;
903 erase_triple(db, tn, q);
904 }
905 }
906
907 close_transaction(q);
908
909 return TRUE;
910 }
911