1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2
3 #include "helperrowcreator.h"
4
5 #include "dict.h"
6 #include "scope.h"
7 #include "allocator.h"
8 #include "transaction.h"
9 #include "address.h"
10 #include "query.h"
11 #include "flag.h"
12 #include "utf.h"
13
14
15
16 /*! \class HelperRowCreator helperrowcreator.h
17
18 The HelperRowCreator class contains common logic and some code to
19 add rows to the helper tables flag_names, annotation_names and
20 header_fields. It's inherited by one class per table.
21
22 In theory this could handle bodyparts and addresses, but I think
23 not. Those are different. Those tables grow to be big. These three
24 tables frequently contain less than one row per thousand messages,
25 so we need to optimise this class for inserting zero, one or at
26 most a few rows.
27 */
28
29
30 class HelperRowCreatorData
31 : public Garbage
32 {
33 public:
HelperRowCreatorData()34 HelperRowCreatorData()
35 : s( 0 ), c( 0 ), notify( 0 ), parent( 0 ), t( 0 ),
36 done( false ), inserted( false )
37 {}
38
39 Query * s;
40 Query * c;
41 Query * notify;
42 Transaction * parent;
43 Transaction * t;
44 EString n;
45 EString e;
46 bool done;
47 bool inserted;
48 Dict<uint> names;
49 };
50
51
52 /*! Constructs an empty HelperRowCreator refering to \a table, using
53 \a transaction. If an error related to \a constraint occurs,
54 execute() will roll back to a savepoint and try again.
55 */
56
HelperRowCreator(const EString & table,Transaction * transaction,const EString & constraint)57 HelperRowCreator::HelperRowCreator( const EString & table,
58 Transaction * transaction,
59 const EString & constraint )
60 : EventHandler(), d( new HelperRowCreatorData )
61 {
62 setLog( new Log );
63 d->parent = transaction;
64 d->n = table + "_creator";
65 d->e = constraint;
66 }
67
68
69 /*! Returns true if this object is done with the Transaction, and
70 false if it will use the Transaction for one or more queries.
71 */
72
done() const73 bool HelperRowCreator::done() const
74 {
75 return d->done;
76 }
77
78
execute()79 void HelperRowCreator::execute()
80 {
81 Scope x( log() );
82 while ( !d->done ) {
83 // If we're waiting for the db, just go away.
84 if ( d->s && !d->s->done() )
85 return;
86 if ( d->c && !d->c->done() )
87 return;
88
89 // First, we select the rows whose IDs we need.
90 if ( !d->s && !d->c ) {
91 d->s = makeSelect();
92 if ( d->s ) {
93 // We don't know all we need, so issue a select.
94 if ( !d->t )
95 d->t = d->parent->subTransaction( this );
96 d->t->enqueue( d->s );
97 d->t->execute();
98 }
99 else {
100 // We do know everything, so we're done.
101 d->done = true;
102 }
103 }
104
105 // When the select is done, see if we need to copy into the table.
106 if ( d->s && d->s->done() && !d->c ) {
107 processSelect( d->s );
108 d->s = 0;
109 d->c = makeCopy();
110 if ( d->c ) {
111 // We do need to insert something.
112 d->t->enqueue( d->c );
113 EString ed = d->n;
114 ed.replace( "creator", "extended" );
115 Query * q = new Query( "notify " + ed, this );
116 d->t->enqueue( q );
117 d->t->execute();
118 d->inserted = true;
119 }
120 }
121
122 // If we need to insert something, look at the fate of the copy.
123 if ( d->c && d->c->done() ) {
124 Query * c = d->c;
125 d->c = 0;
126 if ( !c->failed() ) {
127 // We inserted, hit no race, and want to run another
128 // select to find the IDs.
129 }
130 else if ( c->error().contains( d->e ) ) {
131 // We inserted, but there was a race and we lost it.
132 d->t->restart();
133 }
134 else {
135 // Total failure. The Transaction is now in Failed
136 // state, and there's nothing we can do other. We just
137 // have to let our owner deal with it.
138 d->done = true;
139 }
140 }
141 }
142
143 if ( !d->t )
144 return;
145
146 postprocess( d->t );
147
148 Transaction * t = d->t;
149 d->t = 0;
150 t->commit();
151 }
152
153
154 /*! \fn Query * HelperRowCreator::makeSelect()
155
156 This pure virtual function is called to make a query to return the
157 IDs of rows already in the database, or of newly inserted rows.
158
159 If nothing needs to be done, the makeSelect() can return a null
160 pointer.
161
162 If makeSelect() returns non-null, the returned Query should have
163 this object as owner.
164 */
165
166
167 /*! This virtual function is called to process the result of the
168 makeSelect() Query. \a q is the Query returned by makeSelect()
169 (never 0).
170 */
171
processSelect(Query * q)172 void HelperRowCreator::processSelect( Query * q )
173 {
174 while ( q->hasResults() ) {
175 Row * r = q->nextRow();
176 add( r->getEString( "name" ), r->getInt( "id" ) );
177 }
178 }
179
180
181 /*! \fn Query * HelperRowCreator::makeCopy()
182
183 This pure virtual function is called to make a query to insert the
184 necessary rows to the table.
185
186 If nothing needs to be inserted, makeCopy() can return 0.
187
188 If makeCopy() returns non-null, the returned Query should have
189 this object as owner.
190 */
191
192
193 /*! Remembers that the given name \a s corresponds to the \a id. */
194
add(const EString & s,uint id)195 void HelperRowCreator::add( const EString & s, uint id )
196 {
197 uint * tmp = (uint *)Allocator::alloc( sizeof(uint), 0 );
198 *tmp = id;
199
200 d->names.insert( s.lower(), tmp );
201 }
202
203
204 /*! Returns the id stored earlier with add() for the name \a s. */
205
id(const EString & s)206 uint HelperRowCreator::id( const EString & s )
207 {
208 uint * p = d->names.find( s.lower() );
209 if ( p )
210 return *p;
211 return 0;
212 }
213
214
215 /*! Returns true if this creator inserted at least one row, and false
216 if lookup alone was enough to do the work.
217 */
218
inserted() const219 bool HelperRowCreator::inserted() const
220 {
221 return d->inserted;
222 }
223
224
225 /*! This virtual function is called just before \a t is committed.
226 */
227
postprocess(Transaction *)228 void HelperRowCreator::postprocess( Transaction * )
229 {
230
231 }
232
233
234 /*! \class FlagCreator helperrowcreator.h
235
236 This class issuses queries using a supplied Transaction to add new
237 flags to the database.
238 */
239
240
241 /*! Starts constructing the queries needed to create the flags specified
242 in \a f within the transaction \a t. This object will notify the
243 Transaction::owner() when it's done.
244
245 \a t will fail if flag creation fails for some reason (typically
246 bugs). Transaction::error() should say what went wrong.
247 */
248
FlagCreator(const EStringList & f,Transaction * t)249 FlagCreator::FlagCreator( const EStringList & f, Transaction * t )
250 : HelperRowCreator( "flag_names", t, "fn_uname" ),
251 names( f )
252 {
253 }
254
255
makeSelect()256 Query * FlagCreator::makeSelect()
257 {
258 Query * s = new Query( "select id, name from flag_names where "
259 "lower(name)=any($1::text[])", this );
260
261 EStringList sl;
262 EStringList::Iterator it( names );
263 while ( it ) {
264 EString name( *it );
265 if ( id( name ) == 0 && Flag::id( name ) == 0 )
266 sl.append( name.lower() );
267 ++it;
268 }
269
270 if ( sl.isEmpty() )
271 return 0;
272 s->bind( 1, sl );
273 log( "Looking up " + fn( sl.count() ) + " flags", Log::Debug );
274 return s;
275 }
276
277
makeCopy()278 Query * FlagCreator::makeCopy()
279 {
280 Query * c = new Query( "copy flag_names (name) from stdin with binary",
281 this );
282 uint count = 0;
283 EStringList::Iterator it( names );
284 while ( it ) {
285 if ( id( *it ) == 0 && Flag::id( *it ) == 0 ) {
286 c->bind( 1, *it );
287 c->submitLine();
288 count++;
289 }
290 ++it;
291 }
292
293 if ( !count )
294 return 0;
295 log( "Inserting " + fn( count ) + " new flags" );
296 return c;
297
298 }
299
300
301 /*! \fn EStringList * FlagCreator::allFlags()
302
303 Returns a pointer to a list of all flags known to this FlagCreator.
304 */
305
306
307 /*! \class FieldNameCreator helperrowcreator.h
308
309 The FieldNameCreator is a HelperRowCreator to insert rows into the
310 field_names table. Nothing particular.
311 */
312
313
314 /*! Creates an object to ensure that all entries in \a f are present
315 in field_names, using \a tr for all its queryies.
316 */
317
318
FieldNameCreator(const EStringList & f,Transaction * tr)319 FieldNameCreator::FieldNameCreator( const EStringList & f,
320 Transaction * tr )
321 : HelperRowCreator( "field_names", tr, "field_names_name_key" ),
322 names( f )
323 {
324 }
325
326
makeSelect()327 Query * FieldNameCreator::makeSelect()
328 {
329 Query * q = new Query( "select id, name from field_names where "
330 "name=any($1::text[])", this );
331
332 EStringList sl;
333 EStringList::Iterator it( names );
334 while ( it ) {
335 if ( !id( *it ) )
336 sl.append( *it );
337 ++it;
338 }
339 if ( sl.isEmpty() )
340 return 0;
341 q->bind( 1, sl );
342 log( "Looking up " + fn( sl.count() ) + " field names", Log::Debug );
343 return q;
344 }
345
346
makeCopy()347 Query * FieldNameCreator::makeCopy()
348 {
349 Query * q = new Query( "copy field_names (name) from stdin with binary",
350 this );
351 EStringList::Iterator it( names );
352 uint count = 0;
353 while ( it ) {
354 if ( !id( *it ) ) {
355 q->bind( 1, *it );
356 q->submitLine();
357 count++;
358 }
359 ++it;
360 }
361
362 if ( !count )
363 return 0;
364 log( "Inserting " + fn( count ) + " new header field names" );
365 return q;
366 }
367
368
369 /*! \class AnnotationNameCreator helperrowcreator.h
370
371 The AnnotationNameCreator is a HelperRowCreator to insert rows into
372 the annotation_names table. Nothing particular.
373 */
374
375
376 /*! Creates an object to ensure that all entries in \a f are present
377 in annotation_names, using \a t for all its queryies.
378 */
379
AnnotationNameCreator(const EStringList & f,Transaction * t)380 AnnotationNameCreator::AnnotationNameCreator( const EStringList & f,
381 Transaction * t )
382 : HelperRowCreator( "annotation_names", t, "annotation_names_name_key" ),
383 names( f )
384 {
385 }
386
makeSelect()387 Query * AnnotationNameCreator::makeSelect()
388 {
389 Query * q = new Query( "select id, name from annotation_names where "
390 "name=any($1::text[])", this );
391
392 EStringList sl;
393 EStringList::Iterator it( names );
394 while ( it ) {
395 EString name( *it );
396 if ( id( name ) == 0 )
397 sl.append( name );
398 ++it;
399 }
400 if ( sl.isEmpty() )
401 return 0;
402
403 q->bind( 1, sl );
404 log( "Looking up " + fn( sl.count() ) + " annotation names", Log::Debug );
405 return q;
406 }
407
408
makeCopy()409 Query * AnnotationNameCreator::makeCopy()
410 {
411 Query * q = new Query( "copy annotation_names (name) "
412 "from stdin with binary", this );
413 EStringList::Iterator it( names );
414 uint count = 0;
415 while ( it ) {
416 if ( id( *it ) == 0 ) {
417 count++;
418 q->bind( 1, *it );
419 q->submitLine();
420 }
421 ++it;
422 }
423
424 if ( !count )
425 return 0;
426 log( "Inserting " + fn( count ) + " new annotation names" );
427 return q;
428 }
429
430
431 /*! \class AddressCreator helperrowcreator.h
432
433 The AddressCreator ensures that a set of addresses exist in the
434 database and that their addresses are known.
435
436 You have to create an object, then execute it. It'll use a
437 subtransaction and implicitly block your transaction until the IDs
438 are known.
439 */
440
441
442 /*! Constructs an AddressCreator which will ensure that all the \a
443 addresses have an Address::id(), using a subtransaction if \a t
444 for its work.
445 */
446
AddressCreator(Dict<Address> * addresses,Transaction * t)447 AddressCreator::AddressCreator( Dict<Address> * addresses,
448 Transaction * t )
449 : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
450 a( addresses ), bulk( false ), decided( false ),
451 base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
452 {
453 }
454
455
456 /*! Constructs an AddressCreator which will ensure that \a address has
457 an Address::id(), using a subtransaction if \a t for its work.
458 */
459
AddressCreator(Address * address,class Transaction * t)460 AddressCreator::AddressCreator( Address * address, class Transaction * t )
461 : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
462 a( new Dict<Address> ), bulk( false ), decided( false ),
463 base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
464 {
465 a->insert( AddressCreator::key( address ), address );
466 }
467
468
469 /*! Constructs an AddressCreator which will ensure that all the \a
470 addresses have an Address::id(), using a subtransaction if \a t
471 for its work.
472 */
473
474
AddressCreator(List<Address> * addresses,class Transaction * t)475 AddressCreator::AddressCreator( List<Address> * addresses,
476 class Transaction * t )
477 : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
478 a( new Dict<Address> ), bulk( false ), decided( false ),
479 base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
480 {
481 List<Address>::Iterator address( addresses );
482 while ( address ) {
483 a->insert( AddressCreator::key( address ), address );
484 ++address;
485 }
486 }
487
488
489 /*! This private helper looks for \a s in \a b, inserts it if not
490 present, and returns its number. Uses \a n to generate a new
491 unique number if necessary, and binds \a s to \a n in \a q.
492
493 \a b has to use key() as key for each member. Nothing will work if
494 you break this rule. This sounds a little fragile, but I can't
495 think of a good alternative right now.
496 */
497
param(Dict<uint> * b,const EString & s,uint & n,Query * q)498 uint AddressCreator::param( Dict<uint> * b, const EString & s,
499 uint & n,
500 Query * q )
501 {
502 uint * r = b->find( s );
503 if ( !r ) {
504 r = (uint*)Allocator::alloc( sizeof( uint ), 0 );
505 *r = n++;
506 b->insert( s, r );
507 q->bind( *r, s );
508 }
509 return *r;
510 }
511
512
513 /*! Creates a select to look for as many addresses as possible, but
514 binding no more than 128 strings.
515 */
516
makeSelect()517 Query * AddressCreator::makeSelect()
518 {
519 EString s = "select id, name, localpart::text, domain::text "
520 "from addresses where ";
521 Query * q = new Query( "", this );
522 uint n = 1;
523 Dict<uint> binds;
524 PgUtf8Codec p;
525 bool first = true;
526 Dict<Address>::Iterator i( a );
527 asked.clear();
528 while ( i && n < 128 ) {
529 if ( !i->id() ) {
530 EString name( p.fromUnicode( i->uname() ) );
531 EString lp( i->localpart().utf8() );
532 EString dom( i->domain().utf8() );
533
534 uint bn = param( &binds, name, n, q );
535 uint bl = param( &binds, lp, n, q );
536 uint bd = param( &binds, dom, n, q );
537
538 if ( !first )
539 s.append( " or " );
540 first = false;
541 s.append( "(name=$" );
542 s.appendNumber( bn );
543 s.append( " and localpart=$" );
544 s.appendNumber( bl );
545 s.append( "::citext and domain=$" );
546 s.appendNumber( bd );
547 s.append( "::citext)" );
548
549 asked.append( i );
550 }
551 ++i;
552 }
553 if ( asked.isEmpty() )
554 return 0;
555 q->setString( s );
556 log( "Looking up " + fn( asked.count() ) + " addresses", Log::Debug );
557 return q;
558 }
559
560
processSelect(Query * q)561 void AddressCreator::processSelect( Query * q )
562 {
563 while ( q->hasResults() ) {
564 Row * r = q->nextRow();
565 Address * c =
566 new Address( r->getUString( "name" ),
567 r->getUString( "localpart" ),
568 r->getUString( "domain" ) );
569 Address * our = a->find( key( c ) );
570 if ( our )
571 our->setId( r->getInt( "id" ) );
572 else
573 log( "Unexpected result from db: " + c->toString( false ) );
574 }
575 }
576
577
makeCopy()578 Query * AddressCreator::makeCopy()
579 {
580 uint count = 0;
581 Query * q = new Query( "copy addresses (name,localpart,domain) "
582 "from stdin with binary", this );
583 List<Address>::Iterator i( asked );
584 while ( i ) {
585 if ( !i->id() ) {
586 q->bind( 1, i->uname() );
587 q->bind( 2, i->localpart() );
588 q->bind( 3, i->domain() );
589 q->submitLine();
590 count++;
591 }
592 ++i;
593 }
594 if ( !count )
595 return 0;
596 log( "Inserting " + fn( count ) + " new addresses" );
597 return q;
598 }
599
600
601 /*! Returns a EString derived from \a a in a unique fashion. Two
602 addresses that are the same according to the RFC rules have the
603 same key().
604 */
605
key(Address * a)606 EString AddressCreator::key( Address * a )
607 {
608 EString r;
609 r.append( a->domain().utf8().lower() );
610 r.append( '\0' );
611 r.append( a->localpart().utf8().lower() );
612 r.append( '\0' );
613 r.append( a->uname().utf8() );
614 return r;
615 }
616
617
618 // this constant decides when we change to using the temptable. where's
619 // the crossover point?
620
621 static uint useTempTable = 30;
622
623
624 /*! This overloads HelperRowCreator::execute() and conditionally
625 replaces its state machine with one that's faster for large
626 address sets.
627 */
628
execute()629 void AddressCreator::execute()
630 {
631 Scope x( log() );
632 if ( !decided ) {
633 uint c = 0;
634 Dict<Address>::Iterator i( a );
635 while ( c < useTempTable && i ) {
636 if ( !i->id() )
637 ++c;
638 ++i;
639 }
640 if ( c >= useTempTable )
641 bulk = true;
642 decided = true;
643 }
644
645 if ( !bulk ) {
646 HelperRowCreator::execute();
647 return;
648 }
649
650 if ( !sub ) {
651 base->enqueue( new Query( "create temporary table na ("
652 "id integer, "
653 "f boolean, "
654 "name text, "
655 "localpart citext, "
656 "domain citext )", 0 ) );
657 Query * q = new Query( "copy na (id, f, name,localpart,domain) "
658 "from stdin with binary", this );
659 Dict<Address>::Iterator i( a );
660 while ( i ) {
661 if ( !i->id() ) {
662 q->bind( 1, 0 );
663 q->bind( 2, false );
664 q->bind( 3, i->uname() );
665 q->bind( 4, i->localpart() );
666 q->bind( 5, i->domain() );
667 q->submitLine();
668 }
669 ++i;
670 }
671 base->enqueue( q );
672
673 sub = base->subTransaction( this );
674 }
675
676 if ( insert && insert->failed() ) {
677 sub->restart();
678 insert = 0;
679 }
680
681 if ( !insert ) {
682 sub->enqueue(
683 new Query(
684 "update na set f=true, id=a.id from addresses a "
685 "where na.localpart=a.localpart "
686 "and na.domain=a.domain "
687 "and na.name=a.name "
688 "and not f", 0 ) );
689 sub->enqueue(
690 new Query(
691 "update na "
692 "set id=nextval(pg_get_serial_sequence('addresses','id')) "
693 "where id = 0 and not f", 0 ) );
694 insert =
695 new Query(
696 "insert into addresses "
697 "(id, name, localpart, domain) "
698 "select id, name, localpart, domain "
699 "from na where not f", this );
700 sub->enqueue( insert );
701 sub->execute();
702 }
703
704 if ( !insert->done() )
705 return;
706
707 if ( !obtain ) {
708 obtain = new Query( "select id, name, localpart::text, domain::text "
709 "from na", this );
710 sub->enqueue( obtain );
711 sub->enqueue( new Query( "drop table na", 0 ) );
712 sub->commit();
713 }
714
715 processSelect( obtain );
716 }
717
718
719 /*! \class ThreadRootCreator helperrowcreator.h
720
721 The ThreadRootCreator class thread_roots rows. The only particular
722 here is that id() works on all the message-ids, not just the root
723 ids.
724 */
725
726 /*! Constructs a ThreadRootCreator that will make sure that the
727 messages in \a l are all threadable, using a subtransaction of \a
728 t for all db work.
729 */
730
ThreadRootCreator(List<ThreadRootCreator::Message> * l,Transaction * t)731 ThreadRootCreator::ThreadRootCreator( List<ThreadRootCreator::Message> * l,
732 Transaction * t )
733 : HelperRowCreator( "thread_roots", t, "thread_roots_messageid_key" ),
734 messages( l ), nodes( new Dict<ThreadNode> ), first( true )
735 {
736 List<Message>::Iterator m( messages );
737 while ( m ) {
738 EStringList l = m->references();
739 l.append( m->messageId() );
740 ++m;
741 EStringList::Iterator s( l );
742 ThreadNode * root = 0;
743 while ( s ) {
744 if ( !s->isEmpty() ) {
745 ThreadNode * n = nodes->find( *s );
746 if ( !n ) {
747 n = new ThreadNode( *s );
748 nodes->insert( *s, n );
749 }
750 while ( n->parent )
751 n = n->parent;
752 if ( n == root )
753 ;
754 else if ( root )
755 n->parent = root;
756 else
757 root = n;
758 }
759 ++s;
760 }
761 }
762 }
763
764
makeSelect()765 Query * ThreadRootCreator::makeSelect()
766 {
767 Query * q = 0;
768 EStringList l;
769 Dict<ThreadNode>::Iterator i( nodes );
770 while ( i ) {
771 if ( first || ( !i->parent && !i->trid ) )
772 l.append( i->id );
773 ++i;
774 }
775 if ( first ) // look for IDs in messages.thread_root - once
776 q = new Query( "select id, messageid as name from thread_roots "
777 "where messageid=any($1::text[]) "
778 "union "
779 "select m.thread_root as id, hf.value as name "
780 "from messages m join header_fields hf on "
781 "(m.id=hf.message and hf.field=13) "
782 "where hf.value=any($1::text[]) "
783 "and m.thread_root is not null",
784 this );
785 else
786 q = new Query( "select id, messageid as name from thread_roots "
787 "where messageid=any($1::text[])",
788 this );
789 first = false;
790 if ( l.isEmpty() )
791 return 0;
792 q->bind( 1, l );
793 return q;
794 }
795
796
makeCopy()797 Query * ThreadRootCreator::makeCopy()
798 {
799 Query * q = new Query( "copy thread_roots( messageid ) "
800 "from stdin with binary", 0 );
801 Dict<ThreadNode>::Iterator i( nodes );
802 while ( i ) {
803 if ( !i->parent && !i->trid ) {
804 q->bind( 1, i->id );
805 q->submitLine();
806 }
807 ++i;
808 }
809 return q;
810 }
811
812
id(const EString & id)813 uint ThreadRootCreator::id( const EString & id )
814 {
815 ThreadNode * n = nodes->find( id );
816 if ( !n )
817 return 0;
818 while ( n->parent )
819 n = n->parent;
820 return n->trid;
821 }
822
823
add(const EString & id,uint i)824 void ThreadRootCreator::add( const EString & id, uint i )
825 {
826 ThreadNode * n = nodes->find( id );
827 if ( !n ) {
828 n = new ThreadNode( id );
829 nodes->insert( id, n );
830 }
831 n->trid = i;
832 }
833
834
postprocess(Transaction * t)835 void ThreadRootCreator::postprocess( Transaction * t )
836 {
837 Dict<ThreadNode>::Iterator i( nodes );
838 IntegerSet merged;
839 while ( i ) {
840 if ( i->parent && i->trid ) {
841 ThreadNode * p = i->parent;
842 while ( p->parent )
843 p = p->parent;
844 if ( i->trid != p->trid ) {
845 int big = i->trid;
846 int small = p->trid;
847 if ( big < small ) {
848 big = p->trid;
849 small = i->trid;
850 }
851 p->trid = small;
852 if ( !merged.contains( big ) ) {
853 Query * q = new Query( "select merge_threads( $1, $2 )", 0 );
854 q->bind( 1, small );
855 q->bind( 2, big );
856 t->enqueue( q );
857 }
858 }
859 }
860 ++i;
861 }
862
863 }
864