1 /*  Part of SWI-Prolog
2 
3     Author:        Jan Wielemaker
4     E-mail:        J.Wielemaker@vu.nl
5     WWW:           http://www.swi-prolog.org
6     Copyright (c)  2011-2017, VU University Amsterdam
7     All rights reserved.
8 
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions
11     are met:
12 
13     1. Redistributions of source code must retain the above copyright
14        notice, this list of conditions and the following disclaimer.
15 
16     2. Redistributions in binary form must reproduce the above copyright
17        notice, this list of conditions and the following disclaimer in
18        the documentation and/or other materials provided with the
19        distribution.
20 
21     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22     "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23     LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24     FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25     COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26     INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27     BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28     LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30     LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31     ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32     POSSIBILITY OF SUCH DAMAGE.
33 */
34 
35 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
36 RDF-DB query management. This module keeps  track of running queries. We
37 need this for GC purposes.  In particular, we need to:
38 
39     * Find the oldest active generation.
40     * Get a signal if all currently active queries have finished.
41 
42 In addition to queries, this  module   performs  the  necessary logic on
43 generations:
44 
45     * Is an object visible in a query?
46 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
47 
48 #ifdef HAVE_CONFIG_H
49 #include <config.h>
50 #endif
51 #include <SWI-Stream.h>
52 #include <SWI-Prolog.h>
53 #include <string.h>
54 #include <assert.h>
55 #include "rdf_db.h"
56 #include "query.h"
57 #include "memory.h"
58 #include "mutex.h"
59 #include "buffer.h"
60 
61 static void	init_query_stack(rdf_db *db, query_stack *qs);
62 
63 
64 		 /*******************************
65 		 *	    THREAD DATA		*
66 		 *******************************/
67 
68 /* Return the per-thread data for the given DB.  This version uses no
69    locks for the common case that the required datastructures are
70    already provided.
71 */
72 
73 thread_info *
rdf_thread_info(rdf_db * db,int tid)74 rdf_thread_info(rdf_db *db, int tid)
75 { query_admin *qa = &db->queries;
76   per_thread *td = &qa->query.per_thread;
77   thread_info *ti;
78   size_t idx = MSB(tid);
79 
80   if ( !td->blocks[idx] )
81   { simpleMutexLock(&qa->query.lock);
82     if ( !td->blocks[idx] )
83     { size_t bs = BLOCKLEN(idx);
84       thread_info **newblock = rdf_malloc(db, bs*sizeof(thread_info*));
85 
86       memset(newblock, 0, bs*sizeof(thread_info*));
87 
88       td->blocks[idx] = newblock-bs;
89     }
90     simpleMutexUnlock(&qa->query.lock);
91   }
92 
93   if ( !(ti=td->blocks[idx][tid]) )
94   { simpleMutexLock(&qa->query.lock);
95     if ( !(ti=td->blocks[idx][tid]) )
96     { ti = rdf_malloc(db, sizeof(*ti));
97       memset(ti, 0, sizeof(*ti));
98       init_query_stack(db, &ti->queries);
99       MEMORY_BARRIER();
100       td->blocks[idx][tid] = ti;
101       if ( tid > qa->query.thread_max )
102 	qa->query.thread_max = tid;
103     }
104     simpleMutexUnlock(&qa->query.lock);
105   }
106 
107   return ti;
108 }
109 
110 
111 gen_t
oldest_query_geneneration(rdf_db * db,gen_t * reindex_gen)112 oldest_query_geneneration(rdf_db *db, gen_t *reindex_gen)
113 { int tid;
114   gen_t gen = db->snapshots.keep;
115   gen_t ren = GEN_MAX;
116   query_admin *qa = &db->queries;
117   per_thread *td = &qa->query.per_thread;
118 
119   DEBUG(20,
120 	if ( db->snapshots.keep != GEN_MAX )
121 	{ char buf[64];
122 	  Sdprintf("Oldest snapshot gen = %s\n",
123 		   gen_name(db->snapshots.keep, buf));
124 	});
125 
126   for(tid=1; tid <= qa->query.thread_max; tid++)
127   { thread_info **tis;
128     thread_info *ti;
129 
130     if ( (tis=td->blocks[MSB(tid)]) &&
131 	 (ti=tis[tid]) )
132     { query_stack *qs = &ti->queries;
133 
134       if ( qs->top > 0 )
135       { query *q = &qs->preallocated[0];
136 
137 	DEBUG(10,
138 	      { char buf[20];
139 		Sdprintf("Thread %d: %d queries; oldest gen %s\n",
140 			 tid, qs->top, gen_name(q->rd_gen, buf));
141 	      });
142 
143 	if ( q->rd_gen < gen )
144 	  gen = q->rd_gen;
145 	if ( q->reindex_gen < ren )
146 	  ren = q->reindex_gen;
147       } else
148       { DEBUG(11, Sdprintf("Thread %d: no queries\n", tid));
149       }
150     }
151   }
152 
153   if ( reindex_gen )
154     *reindex_gen = ren;
155 
156   return gen;
157 }
158 
159 
160 
161 		 /*******************************
162 		 *	    QUERY-STACK		*
163 		 *******************************/
164 
165 static void
preinit_query(rdf_db * db,query_stack * qs,query * q,query * parent,int depth)166 preinit_query(rdf_db *db, query_stack *qs, query *q, query *parent, int depth)
167 { q->db     = db;
168   q->stack  = qs;
169   q->parent = q;
170   q->depth  = depth;
171 }
172 
173 
174 static void
init_query_stack(rdf_db * db,query_stack * qs)175 init_query_stack(rdf_db *db, query_stack *qs)
176 { int tid = PL_thread_self();
177   int i;
178   int prealloc = sizeof(qs->preallocated)/sizeof(qs->preallocated[0]);
179   query *parent = NULL;
180 
181   memset(qs, 0, sizeof(*qs));
182 
183   simpleMutexInit(&qs->lock);
184   qs->db = db;
185   qs->tr_gen_base = GEN_TBASE + tid*GEN_TNEST;
186   qs->tr_gen_max  = qs->tr_gen_base + (GEN_TNEST-1);
187 
188   for(i=0; i<MSB(prealloc); i++)
189     qs->blocks[i] = qs->preallocated;
190   for(i=0; i<prealloc; i++)
191   { query *q = &qs->preallocated[i];
192 
193     preinit_query(db, qs, q, parent, i);
194     parent = q;
195   }
196 }
197 
198 
199 static query *
alloc_query(query_stack * qs)200 alloc_query(query_stack *qs)
201 { int depth = qs->top;
202   int b = MSB(depth);
203 
204   if ( b >= MAX_QBLOCKS )
205   { PL_resource_error("open_rdf_queries");
206     return NULL;
207   }
208 
209   if ( qs->blocks[b] )
210   { query *q = &qs->blocks[b][depth];
211 
212     assert(q->stack);
213 
214     return q;
215   }
216 
217   simpleMutexLock(&qs->lock);
218   if ( !qs->blocks[b] )
219   { size_t bytes = BLOCKLEN(b) * sizeof(query);
220     query *ql = rdf_malloc(qs->db, bytes);
221     query *parent;
222     int i;
223 
224     if ( !ql )
225     { simpleMutexUnlock(&qs->lock);
226       PL_resource_error("memory");
227       return NULL;
228     }
229 
230     memset(ql, 0, bytes);
231     ql -= depth;			/* rebase */
232     parent = &qs->blocks[b-1][depth-1];
233     for(i=depth; i<depth*2; i++)
234     { query *q = &ql[i];
235       preinit_query(qs->db, qs, q, parent, i);
236       parent = q;
237     }
238     MEMORY_BARRIER();
239     qs->blocks[b] = ql;
240   }
241   simpleMutexUnlock(&qs->lock);
242 
243   return &qs->blocks[b][depth];
244 }
245 
246 
247 static void
push_query(rdf_db * db,query * q)248 push_query(rdf_db *db, query *q)
249 { enter_scan(&db->defer_all);
250   q->stack->top++;
251 }
252 
253 
254 static void
pop_query(rdf_db * db,query * q)255 pop_query(rdf_db *db, query *q)
256 { q->stack->top--;
257   exit_scan(&db->defer_all);
258 }
259 
260 
261 query *
open_query(rdf_db * db)262 open_query(rdf_db *db)
263 { int tid = PL_thread_self();
264   thread_info *ti = rdf_thread_info(db, tid);
265   query *q = alloc_query(&ti->queries);
266 
267   if ( !q ) return NULL;
268   q->type = Q_NORMAL;
269   q->transaction = ti->queries.transaction;
270   q->reindex_gen = db->reindexed;
271   if ( q->transaction )			/* Query inside a transaction */
272   { q->rd_gen = q->transaction->rd_gen;
273     q->tr_gen = q->transaction->wr_gen;
274     q->wr_gen = q->transaction->wr_gen;
275   } else
276   { q->rd_gen = db->queries.generation;
277     q->tr_gen = GEN_TBASE;
278     q->wr_gen = GEN_UNDEF;
279   }
280 
281   push_query(db, q);
282 
283   return q;
284 }
285 
286 
287 query *
open_transaction(rdf_db * db,triple_buffer * added,triple_buffer * deleted,triple_buffer * updated,snapshot * ss)288 open_transaction(rdf_db *db,
289 		 triple_buffer *added,
290 		 triple_buffer *deleted,
291 		 triple_buffer *updated,
292 		 snapshot *ss)
293 { int tid = PL_thread_self();
294   thread_info *ti = rdf_thread_info(db, tid);
295   query *q = alloc_query(&ti->queries);
296 
297   if ( !q ) return NULL;
298   q->type = Q_TRANSACTION;
299   q->transaction = ti->queries.transaction;
300   q->reindex_gen = GEN_MAX;		/* should not get this down */
301 
302   if ( ss && ss != SNAPSHOT_ANONYMOUS )
303   { int ss_tid = snapshot_thread(ss);
304     assert(!ss_tid || ss_tid == tid);
305     (void)ss_tid;
306 
307     q->rd_gen = ss->rd_gen;
308     q->tr_gen = ss->tr_gen;
309   } else if ( q->transaction )		/* nested transaction */
310   { q->rd_gen = q->transaction->rd_gen;
311     q->tr_gen = q->transaction->wr_gen;
312   } else
313   { q->rd_gen = db->queries.generation;
314     q->tr_gen = ti->queries.tr_gen_base;
315   }
316 
317   q->wr_gen = q->tr_gen;
318   ti->queries.transaction = q;
319 
320   init_triple_buffer(added);
321   init_triple_buffer(deleted);
322   init_triple_buffer(updated);
323   q->transaction_data.added = added;
324   q->transaction_data.deleted = deleted;
325   q->transaction_data.updated = updated;
326 
327   push_query(db, q);
328 
329   return q;
330 }
331 
332 
333 void
close_query(query * q)334 close_query(query *q)
335 { pop_query(q->db, q);
336 }
337 
338 
339 int
empty_transaction(query * q)340 empty_transaction(query *q)
341 { return ( is_empty_buffer(q->transaction_data.added) &&
342 	   is_empty_buffer(q->transaction_data.deleted) &&
343 	   is_empty_buffer(q->transaction_data.updated) );
344 }
345 
346 
347 		 /*******************************
348 		 *	     ADMIN		*
349 		 *******************************/
350 
351 void
init_query_admin(rdf_db * db)352 init_query_admin(rdf_db *db)
353 { query_admin *qa = &db->queries;
354 
355   memset(qa, 0, sizeof(*qa));
356   simpleMutexInit(&qa->query.lock);
357   simpleMutexInit(&qa->write.lock);
358   simpleMutexInit(&qa->write.generation_lock);
359 }
360 
361 
362 		 /*******************************
363 		 *	    GENERATIONS		*
364 		 *******************************/
365 
366 /* lifespan() is true if a lifespan is visible inside a query.
367 
368    A lifespan is alive if the query generation is inside it,
369    but with transactions there are two problems:
370 
371 	- If the triple is deleted by a parent transaction it is dead
372 	- If the triple is created by a parent transaction it is alive
373 */
374 
375 char *
gen_name(gen_t gen,char * buf)376 gen_name(gen_t gen, char *buf)
377 { static char tmp[24];
378 
379   if ( !buf )
380     buf = tmp;
381   if ( gen == GEN_UNDEF   ) return "GEN_UNDEF";
382   if ( gen == GEN_MAX     ) return "GEN_MAX";
383   if ( gen == GEN_PREHIST ) return "GEN_PREHIST";
384   if ( gen >= GEN_TBASE )
385   { int tid = (gen-GEN_TBASE)/GEN_TNEST;
386     gen_t r = (gen-GEN_TBASE)%GEN_TNEST;
387 
388     if ( r == GEN_TNEST-1 )
389       Ssprintf(buf, "T%d+GEN_TNEST", tid);
390     else
391       Ssprintf(buf, "T%d+%lld", tid, (int64_t)r);
392     return buf;
393   }
394   Ssprintf(buf, "%lld", (int64_t)gen);
395   return buf;
396 }
397 
398 
399 
400 int
alive_lifespan(query * q,lifespan * lifespan)401 alive_lifespan(query *q, lifespan *lifespan)
402 { DEBUG(2,
403 	{ char b[4][24];
404 
405 	  Sdprintf("q: rd_gen=%s; tr_gen=%s; t: %s..%s\n",
406 		   gen_name(q->rd_gen, b[0]),
407 		   gen_name(q->tr_gen, b[1]),
408 		   gen_name(lifespan->born, b[2]),
409 		   gen_name(lifespan->died, b[3]));
410 	});
411 
412   if ( q->rd_gen >= lifespan->born &&
413        q->rd_gen <  lifespan->died )
414   { if ( is_wr_transaction_gen(q, lifespan->died) &&
415 	 q->tr_gen >= lifespan->died )
416       return FALSE;
417 
418     return TRUE;
419   } else				/* created/died in transaction */
420   { if ( is_wr_transaction_gen(q, lifespan->born) )
421     { if ( q->tr_gen >= lifespan->born &&
422 	   q->tr_gen <  lifespan->died )
423 	return TRUE;
424     }
425   }
426 
427   return FALSE;
428 }
429 
430 
431 int
born_lifespan(query * q,lifespan * lifespan)432 born_lifespan(query *q, lifespan *lifespan)
433 { if ( q->rd_gen >= lifespan->born )
434     return TRUE;
435 
436   if ( is_wr_transaction_gen(q, lifespan->born) &&
437        q->tr_gen >= lifespan->born )
438     return TRUE;
439 
440   return FALSE;
441 }
442 
443 
444 		 /*******************************
445 		 *     TRIPLE MANIPULATION	*
446 		 *******************************/
447 
448 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
449 We have three basic triple manipulations:
450 
451   - Add triples
452   - Delete triples
453   - Updated triples (expressed as deleting and adding)
454 
455 add_triples() adds an array of  triples   to  the database, stepping the
456 database generation by 1. Calls to add triples must be synchronized with
457 other addition calls, but not  with   read  nor  delete operations. This
458 synchronization is needed because without we   cannot set the generation
459 for new queries to a proper value.
460 
461 To reduce the locked time, we perform this in multiple steps:
462 
463   - prelink_triple() performs tasks that do not affect the remainder of
464     the database.
465   - In the link-phase, we add the triples in packages of ADD_CHUNK_SIZE
466     to the database, but addressed in the far future.  No reader sees
467     see what we are doing.
468   - Next, we grab the generation_lock and update the triples to
469     the next generation and increment the generation to make them
470     visible.
471   - Finally, we do some post-hock work to update statistics.
472 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
473 
474 #define ADD_CHUNK_SIZE 50
475 
476 int
add_triples(query * q,triple ** triples,size_t count)477 add_triples(query *q, triple **triples, size_t count)
478 { rdf_db *db = q->db;
479   gen_t gen, gen_max;
480   triple **ep = triples+count;
481   triple **tp;
482 
483 					/* pre-lock phase */
484   for(tp=triples; tp < ep; tp++)
485     prelink_triple(db, *tp, q);
486   consider_triple_rehash(db, count);
487 
488 					/* Add the triples in the future */
489   gen_max = query_max_gen(q);
490   for(tp=triples; tp < ep; )
491   { triple **echunk = tp+50;
492 
493     if ( echunk > ep )
494       echunk = ep;
495 
496     simpleMutexLock(&db->queries.write.lock);
497     for(; tp<echunk; tp++)
498     { triple *t = *tp;
499 
500       t->lifespan.born = gen_max;
501       t->lifespan.died = gen_max;
502       link_triple(db, t, q);
503     }
504     simpleMutexUnlock(&db->queries.write.lock);
505   }
506 
507 					/* generation update */
508   simpleMutexLock(&db->queries.write.generation_lock);
509   gen = queryWriteGen(q)+1;
510   for(tp=triples; tp < ep; tp++)
511   { triple *t = *tp;
512 
513     t->lifespan.born = gen;
514   }
515   setWriteGen(q, gen);
516   simpleMutexUnlock(&db->queries.write.generation_lock);
517 
518   if ( q->transaction )
519   { for(tp=triples; tp < ep; tp++)
520     { triple *t = *tp;
521 
522       postlink_triple(db, t, q);
523       buffer_triple(q->transaction->transaction_data.added, t);
524     }
525   } else
526   { for(tp=triples; tp < ep; tp++)
527     { triple *t = *tp;
528 
529       postlink_triple(db, t, q);
530     }
531 
532     if ( rdf_is_broadcasting(EV_ASSERT|EV_ASSERT_LOAD) )
533     { for(tp=triples; tp < ep; tp++)
534       { triple *t = *tp;
535 	broadcast_id id = t->loaded ? EV_ASSERT_LOAD : EV_ASSERT;
536 
537 	if ( !rdf_broadcast(id, t, NULL) )
538 	  return FALSE;
539       }
540     }
541   }
542 
543   return TRUE;
544 }
545 
546 
547 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
548 del_triples() deletes triples from the database.  There are two actions:
549 
550   - del_triple_consequences() deletes (entailment) consequences of
551     erasing the triple.  Currently this is handling subPropertyOf
552     entailment.  This doesn't remove the triple, but merely invalidates
553     the subPropertyOf reachability matrix for subsequent generations.
554   - erase_triple() is called on the final commit and updates statistics.
555 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
556 
557 int
del_triples(query * q,triple ** triples,size_t count)558 del_triples(query *q, triple **triples, size_t count)
559 { rdf_db *db = q->db;
560   gen_t gen;
561   triple **ep = triples+count;
562   triple **tp;
563 
564   if ( count == 0 )
565     return TRUE;
566   else
567     rdf_create_gc_thread(db);
568 
569   simpleMutexLock(&db->queries.write.generation_lock);
570   simpleMutexLock(&db->queries.write.lock);
571   gen = queryWriteGen(q) + 1;
572 
573   for(tp=triples; tp < ep; tp++)
574   { triple *t = deref_triple(db, *tp);
575 
576     t->lifespan.died = gen;
577     del_triple_consequences(db, t, q);
578 
579     if ( q->transaction )
580       buffer_triple(q->transaction->transaction_data.deleted, t);
581     else
582       erase_triple(db, t, q);
583   }
584 
585   setWriteGen(q, gen);
586   simpleMutexUnlock(&db->queries.write.lock);
587   simpleMutexUnlock(&db->queries.write.generation_lock);
588 
589   if ( !q->transaction && rdf_is_broadcasting(EV_RETRACT) )
590   { for(tp=triples; tp < ep; tp++)
591     { triple *t = deref_triple(db, *tp);
592 
593       if ( !rdf_broadcast(EV_RETRACT, t, NULL) )
594 	return FALSE;
595     }
596   }
597 
598   return TRUE;
599 }
600 
601 
602 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
603 update_triples() updates an array of triples.
604 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
605 
606 int
update_triples(query * q,triple ** old,triple ** new,size_t count)607 update_triples(query *q,
608 	       triple **old, triple **new,
609 	       size_t count)
610 { rdf_db *db = q->db;
611   gen_t gen, gen_max;
612   triple **eo = old+count;
613   triple **en = new+count;
614   triple **to, **tn;
615   size_t updated = 0;
616 
617   if ( count == 0 )
618     return TRUE;
619   else
620     rdf_create_gc_thread(db);
621 
622   for(tn=new; tn < en; tn++)
623   { triple *t = *tn;
624 
625     if ( t )
626       prelink_triple(db, t, q);
627   }
628 
629   simpleMutexLock(&db->queries.write.generation_lock);
630   simpleMutexLock(&db->queries.write.lock);
631   gen = queryWriteGen(q) + 1;
632   gen_max = query_max_gen(q);
633 
634   for(to=old,tn=new; to < eo; to++,tn++)
635   { if ( *tn )
636     { triple *n = *tn;				/* new, cannot be reindexed */
637       triple *o = deref_triple(db, *to);
638 
639       o->lifespan.died = gen;
640       n->lifespan.born = gen;
641       n->lifespan.died = gen_max;
642       link_triple(db, *tn, q);
643       del_triple_consequences(db, o, q);
644       if ( q->transaction )
645       { buffer_triple(q->transaction->transaction_data.updated, *to);
646 	buffer_triple(q->transaction->transaction_data.updated, *tn);
647       } else
648       { erase_triple(db, *to, q);
649       }
650 
651       updated++;
652     }
653   }
654 
655   setWriteGen(q, gen);
656   simpleMutexUnlock(&db->queries.write.lock);
657   simpleMutexUnlock(&db->queries.write.generation_lock);
658 
659   consider_triple_rehash(db, 1);
660 
661   if ( !q->transaction && rdf_is_broadcasting(EV_UPDATE) )
662   { for(to=old,tn=new; to < eo; to++,tn++)
663     { triple *t = *tn;
664 
665       if ( t )
666       { postlink_triple(db, *tn, q);
667 
668 	if ( !rdf_broadcast(EV_UPDATE, *to, *tn) )
669 	  return FALSE;
670       }
671     }
672   } else
673   { for(tn=new; tn < en; tn++)
674     { triple *t = *tn;
675 
676       if ( t )
677 	postlink_triple(db, t, q);
678     }
679   }
680 
681   return TRUE;
682 }
683 
684 
685 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
686 Matrices  used  in  a  transaction  must  be  discarded  because  a  new
687 transaction will use the same  generation   numbers,  but  typically for
688 different modifications.
689 
690 TBD: Hand some statistics to  GC,  such   that  we  know  that there are
691 matrices to collect.
692 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
693 
694 static void
invalidate_lifespans_transaction(query * q)695 invalidate_lifespans_transaction(query *q)
696 { cell *c, *next;
697 
698   for(c=q->transaction_data.lifespans.head; c; c=next)
699   { lifespan *span = c->value;
700 
701     next = c->next;
702     span->died = GEN_PREHIST;
703     rdf_free(q->db, c, sizeof(*c));
704   }
705 
706   q->transaction_data.lifespans.head = NULL;
707   q->transaction_data.lifespans.tail = NULL;
708 }
709 
710 
711 void
close_transaction(query * q)712 close_transaction(query *q)
713 { assert(q->type == Q_TRANSACTION);
714 
715   free_triple_buffer(q->transaction_data.added);
716   free_triple_buffer(q->transaction_data.deleted);
717   free_triple_buffer(q->transaction_data.updated);
718   invalidate_lifespans_transaction(q);
719 
720   q->stack->transaction = q->transaction;
721 
722   close_query(q);
723 }
724 
725 
726 static void
commit_add(query * q,gen_t gen_max,gen_t gen,triple * t)727 commit_add(query *q, gen_t gen_max, gen_t gen, triple *t)
728 { t = deref_triple(q->db, t);
729 
730   if ( t->lifespan.died == gen_max )
731   { t->lifespan.born = gen;
732     add_triple_consequences(q->db, t, q);
733     if ( q->transaction )
734       buffer_triple(q->transaction->transaction_data.added, t);
735     else
736       t->lifespan.died = GEN_MAX;
737   }
738 }
739 
740 
741 static void
commit_del(query * q,gen_t gen,triple * t)742 commit_del(query *q, gen_t gen, triple *t)
743 { t = deref_triple(q->db, t);
744 
745   if ( is_wr_transaction_gen(q, t->lifespan.died) )
746   { t->lifespan.died = gen;
747     if ( q->transaction )
748     { del_triple_consequences(q->db, t, q);
749       buffer_triple(q->transaction->transaction_data.deleted, t);
750     } else
751     { erase_triple(q->db, t, q);
752     }
753   }
754 }
755 
756 
757 int
commit_transaction(query * q)758 commit_transaction(query *q)
759 { rdf_db *db = q->db;
760   triple **tp;
761   gen_t gen, gen_max;
762 
763   simpleMutexLock(&db->queries.write.generation_lock);
764   simpleMutexLock(&db->queries.write.lock);
765   gen = queryWriteGen(q) + 1;
766   gen_max = transaction_max_gen(q);
767 					/* added triples */
768   for(tp=q->transaction_data.added->base;
769       tp<q->transaction_data.added->top;
770       tp++)
771   { commit_add(q, gen_max, gen, *tp);
772   }
773 					/* deleted triples */
774   for(tp=q->transaction_data.deleted->base;
775       tp<q->transaction_data.deleted->top;
776       tp++)
777   { commit_del(q, gen, *tp);
778   }
779 
780 					/* updated triples */
781   for(tp=q->transaction_data.updated->base;
782       tp<q->transaction_data.updated->top;
783       tp += 2)
784   { triple *to = tp[0];
785     triple *tn = tp[1];
786 
787     commit_del(q, gen, to);
788     commit_add(q, gen_max, gen, tn);
789   }
790 
791   setWriteGen(q, gen);
792   simpleMutexUnlock(&db->queries.write.lock);
793   simpleMutexUnlock(&db->queries.write.generation_lock);
794 
795   q->stack->transaction = q->transaction; /* do not nest monitor calls */
796 					  /* inside the transaction */
797 
798 					/* Broadcast new triples */
799   if ( !q->transaction )
800   { if ( rdf_is_broadcasting(EV_RETRACT) )
801     { for(tp=q->transaction_data.deleted->base;
802 	  tp<q->transaction_data.deleted->top;
803 	  tp++)
804       { triple *t = *tp;
805 
806 	if ( t->lifespan.died == gen )
807 	{ if ( !rdf_broadcast(EV_RETRACT, t, NULL) )
808 	    return FALSE;
809 	}
810       }
811     }
812 
813     if ( rdf_is_broadcasting(EV_ASSERT|EV_ASSERT_LOAD) )
814     { for(tp=q->transaction_data.added->base;
815 	  tp<q->transaction_data.added->top;
816 	  tp++)
817       { triple *t = *tp;
818 
819 	if ( t->lifespan.born == gen )
820 	{ broadcast_id id = t->loaded ? EV_ASSERT_LOAD : EV_ASSERT;
821 
822 	  if ( !rdf_broadcast(id, t, NULL) )
823 	    return FALSE;
824 	}
825       }
826     }
827 
828     if ( rdf_is_broadcasting(EV_UPDATE) )
829     { for(tp=q->transaction_data.updated->base;
830 	  tp<q->transaction_data.updated->top;
831 	  tp += 2)
832       { triple *to = tp[0];
833 	triple *tn = tp[1];
834 
835 	if ( to->lifespan.died == gen &&
836 	     tn->lifespan.born == gen )
837 	{ if ( !rdf_broadcast(EV_UPDATE, to, tn) )
838 	    return FALSE;
839 	}
840       }
841     }
842   }
843 
844   close_transaction(q);
845 
846   return TRUE;
847 }
848 
849 
850 /* TBD: What if someone else deleted this triple too?  We can check
851    that by discovering multiple changes to the died generation.
852 */
853 
854 int
discard_transaction(query * q)855 discard_transaction(query *q)
856 { rdf_db *db = q->db;
857   triple **tp;
858   gen_t gen_max = transaction_max_gen(q);
859 
860   for(tp=q->transaction_data.added->base;
861       tp<q->transaction_data.added->top;
862       tp++)
863   { triple *t = *tp;
864 
865 					/* revert creation of new */
866     if ( is_wr_transaction_gen(q, t->lifespan.born) )
867     { t = deref_triple(db, t);
868       t->lifespan.died = GEN_PREHIST;
869       erase_triple(db, t, q);
870     }
871   }
872 
873   for(tp=q->transaction_data.deleted->base;
874       tp<q->transaction_data.deleted->top;
875       tp++)
876   { triple *t = *tp;
877 
878 					/* revert deletion of old */
879     if ( is_wr_transaction_gen(q, t->lifespan.died) )
880     { t = deref_triple(db, t);
881 
882       t->lifespan.died = gen_max;
883     }
884   }
885 
886   for(tp=q->transaction_data.updated->base;
887       tp<q->transaction_data.updated->top;
888       tp += 2)
889   { triple *to = tp[0];
890     triple *tn = tp[1];
891 
892 					/* revert deletion of old */
893     if ( is_wr_transaction_gen(q, to->lifespan.died) )
894     { to = deref_triple(db, to);
895 
896       to->lifespan.died = gen_max;
897     }
898 					/* revert creation of new */
899     if ( is_wr_transaction_gen(q, tn->lifespan.born) &&
900 	 tn->lifespan.died == gen_max )
901     { tn = deref_triple(db, tn);
902       tn->lifespan.died = GEN_PREHIST;
903       erase_triple(db, tn, q);
904     }
905   }
906 
907   close_transaction(q);
908 
909   return TRUE;
910 }
911