1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2 
3 #include "helperrowcreator.h"
4 
5 #include "dict.h"
6 #include "scope.h"
7 #include "allocator.h"
8 #include "transaction.h"
9 #include "address.h"
10 #include "query.h"
11 #include "flag.h"
12 #include "utf.h"
13 
14 
15 
16 /*! \class HelperRowCreator helperrowcreator.h
17 
18     The HelperRowCreator class contains common logic and some code to
19     add rows to the helper tables flag_names, annotation_names and
20     header_fields. It's inherited by one class per table.
21 
22     In theory this could handle bodyparts and addresses, but I think
23     not. Those are different. Those tables grow to be big. These three
24     tables frequently contain less than one row per thousand messages,
25     so we need to optimise this class for inserting zero, one or at
26     most a few rows.
27 */
28 
29 
30 class HelperRowCreatorData
31     : public Garbage
32 {
33 public:
HelperRowCreatorData()34     HelperRowCreatorData()
35         : s( 0 ), c( 0 ), notify( 0 ), parent( 0 ), t( 0 ),
36           done( false ), inserted( false )
37     {}
38 
39     Query * s;
40     Query * c;
41     Query * notify;
42     Transaction * parent;
43     Transaction * t;
44     EString n;
45     EString e;
46     bool done;
47     bool inserted;
48     Dict<uint> names;
49 };
50 
51 
52 /*!  Constructs an empty HelperRowCreator refering to \a table, using
53      \a transaction. If an error related to \a constraint occurs,
54      execute() will roll back to a savepoint and try again.
55 */
56 
HelperRowCreator(const EString & table,Transaction * transaction,const EString & constraint)57 HelperRowCreator::HelperRowCreator( const EString & table,
58                                     Transaction * transaction,
59                                     const EString & constraint )
60     : EventHandler(), d( new HelperRowCreatorData )
61 {
62     setLog( new Log );
63     d->parent = transaction;
64     d->n = table + "_creator";
65     d->e = constraint;
66 }
67 
68 
69 /*! Returns true if this object is done with the Transaction, and
70     false if it will use the Transaction for one or more queries.
71 */
72 
done() const73 bool HelperRowCreator::done() const
74 {
75     return d->done;
76 }
77 
78 
execute()79 void HelperRowCreator::execute()
80 {
81     Scope x( log() );
82     while ( !d->done ) {
83         // If we're waiting for the db, just go away.
84         if ( d->s && !d->s->done() )
85             return;
86         if ( d->c && !d->c->done() )
87             return;
88 
89         // First, we select the rows whose IDs we need.
90         if ( !d->s && !d->c ) {
91             d->s = makeSelect();
92             if ( d->s ) {
93                 // We don't know all we need, so issue a select.
94                 if ( !d->t )
95                     d->t = d->parent->subTransaction( this );
96                 d->t->enqueue( d->s );
97                 d->t->execute();
98             }
99             else {
100                 // We do know everything, so we're done.
101                 d->done = true;
102             }
103         }
104 
105         // When the select is done, see if we need to copy into the table.
106         if ( d->s && d->s->done() && !d->c ) {
107             processSelect( d->s );
108             d->s = 0;
109             d->c = makeCopy();
110             if ( d->c ) {
111                 // We do need to insert something.
112                 d->t->enqueue( d->c );
113                 EString ed = d->n;
114                 ed.replace( "creator", "extended" );
115                 Query * q = new Query( "notify " + ed, this );
116                 d->t->enqueue( q );
117                 d->t->execute();
118                 d->inserted = true;
119             }
120         }
121 
122         // If we need to insert something, look at the fate of the copy.
123         if ( d->c && d->c->done() ) {
124             Query * c = d->c;
125             d->c = 0;
126             if ( !c->failed() ) {
127                 // We inserted, hit no race, and want to run another
128                 // select to find the IDs.
129             }
130             else if ( c->error().contains( d->e ) ) {
131                 // We inserted, but there was a race and we lost it.
132                 d->t->restart();
133             }
134             else {
135                 // Total failure. The Transaction is now in Failed
136                 // state, and there's nothing we can do other. We just
137                 // have to let our owner deal with it.
138                 d->done = true;
139             }
140         }
141     }
142 
143     if ( !d->t )
144         return;
145 
146     postprocess( d->t );
147 
148     Transaction * t = d->t;
149     d->t = 0;
150     t->commit();
151 }
152 
153 
154 /*! \fn Query * HelperRowCreator::makeSelect()
155 
156     This pure virtual function is called to make a query to return the
157     IDs of rows already in the database, or of newly inserted rows.
158 
159     If nothing needs to be done, the makeSelect() can return a null
160     pointer.
161 
162     If makeSelect() returns non-null, the returned Query should have
163     this object as owner.
164  */
165 
166 
167 /*! This virtual function is called to process the result of the
168     makeSelect() Query. \a q is the Query returned by makeSelect()
169     (never 0).
170  */
171 
processSelect(Query * q)172 void HelperRowCreator::processSelect( Query * q )
173 {
174     while ( q->hasResults() ) {
175         Row * r = q->nextRow();
176         add( r->getEString( "name" ), r->getInt( "id" ) );
177     }
178 }
179 
180 
181 /*! \fn Query * HelperRowCreator::makeCopy()
182 
183     This pure virtual function is called to make a query to insert the
184     necessary rows to the table.
185 
186     If nothing needs to be inserted, makeCopy() can return 0.
187 
188     If makeCopy() returns non-null, the returned Query should have
189     this object as owner.
190  */
191 
192 
193 /*! Remembers that the given name \a s corresponds to the \a id. */
194 
add(const EString & s,uint id)195 void HelperRowCreator::add( const EString & s, uint id )
196 {
197     uint * tmp = (uint *)Allocator::alloc( sizeof(uint), 0 );
198     *tmp = id;
199 
200     d->names.insert( s.lower(), tmp );
201 }
202 
203 
204 /*! Returns the id stored earlier with add() for the name \a s. */
205 
id(const EString & s)206 uint HelperRowCreator::id( const EString & s )
207 {
208     uint * p = d->names.find( s.lower() );
209     if ( p )
210         return *p;
211     return 0;
212 }
213 
214 
215 /*! Returns true if this creator inserted at least one row, and false
216     if lookup alone was enough to do the work.
217 */
218 
inserted() const219 bool HelperRowCreator::inserted() const
220 {
221     return d->inserted;
222 }
223 
224 
225 /*! This virtual function is called just before \a t is committed.
226 */
227 
postprocess(Transaction *)228 void HelperRowCreator::postprocess( Transaction * )
229 {
230 
231 }
232 
233 
234 /*! \class FlagCreator helperrowcreator.h
235 
236     This class issuses queries using a supplied Transaction to add new
237     flags to the database.
238 */
239 
240 
241 /*! Starts constructing the queries needed to create the flags specified
242     in \a f within the transaction \a t. This object will notify the
243     Transaction::owner() when it's done.
244 
245     \a t will fail if flag creation fails for some reason (typically
246     bugs). Transaction::error() should say what went wrong.
247 */
248 
FlagCreator(const EStringList & f,Transaction * t)249 FlagCreator::FlagCreator( const EStringList & f, Transaction * t )
250     : HelperRowCreator( "flag_names", t, "fn_uname" ),
251       names( f )
252 {
253 }
254 
255 
makeSelect()256 Query * FlagCreator::makeSelect()
257 {
258     Query * s = new Query( "select id, name from flag_names where "
259                            "lower(name)=any($1::text[])", this );
260 
261     EStringList sl;
262     EStringList::Iterator it( names );
263     while ( it ) {
264         EString name( *it );
265         if ( id( name ) == 0 && Flag::id( name ) == 0 )
266             sl.append( name.lower() );
267         ++it;
268     }
269 
270     if ( sl.isEmpty() )
271         return 0;
272     s->bind( 1, sl );
273     log( "Looking up " + fn( sl.count() ) + " flags", Log::Debug );
274     return s;
275 }
276 
277 
makeCopy()278 Query * FlagCreator::makeCopy()
279 {
280     Query * c = new Query( "copy flag_names (name) from stdin with binary",
281                            this );
282     uint count = 0;
283     EStringList::Iterator it( names );
284     while ( it ) {
285         if ( id( *it ) == 0 && Flag::id( *it ) == 0 ) {
286             c->bind( 1, *it );
287             c->submitLine();
288             count++;
289         }
290         ++it;
291     }
292 
293     if ( !count )
294         return 0;
295     log( "Inserting " + fn( count ) + " new flags" );
296     return c;
297 
298 }
299 
300 
301 /*! \fn EStringList * FlagCreator::allFlags()
302 
303     Returns a pointer to a list of all flags known to this FlagCreator.
304 */
305 
306 
307 /*! \class FieldNameCreator helperrowcreator.h
308 
309     The FieldNameCreator is a HelperRowCreator to insert rows into the
310     field_names table. Nothing particular.
311 */
312 
313 
314 /*! Creates an object to ensure that all entries in \a f are present
315     in field_names, using \a tr for all its queryies.
316 */
317 
318 
FieldNameCreator(const EStringList & f,Transaction * tr)319 FieldNameCreator::FieldNameCreator( const EStringList & f,
320                                     Transaction * tr )
321     : HelperRowCreator( "field_names", tr,  "field_names_name_key" ),
322       names( f )
323 {
324 }
325 
326 
makeSelect()327 Query * FieldNameCreator::makeSelect()
328 {
329     Query * q = new Query( "select id, name from field_names where "
330                            "name=any($1::text[])", this );
331 
332     EStringList sl;
333     EStringList::Iterator it( names );
334     while ( it ) {
335         if ( !id( *it ) )
336             sl.append( *it );
337         ++it;
338     }
339     if ( sl.isEmpty() )
340         return 0;
341     q->bind( 1, sl );
342     log( "Looking up " + fn( sl.count() ) + " field names", Log::Debug );
343     return q;
344 }
345 
346 
makeCopy()347 Query * FieldNameCreator::makeCopy()
348 {
349     Query * q = new Query( "copy field_names (name) from stdin with binary",
350                            this );
351     EStringList::Iterator it( names );
352     uint count = 0;
353     while ( it ) {
354         if ( !id( *it ) ) {
355             q->bind( 1, *it );
356             q->submitLine();
357             count++;
358         }
359         ++it;
360     }
361 
362     if ( !count )
363         return 0;
364     log( "Inserting " + fn( count ) + " new header field names" );
365     return q;
366 }
367 
368 
369 /*! \class AnnotationNameCreator helperrowcreator.h
370 
371     The AnnotationNameCreator is a HelperRowCreator to insert rows into
372     the annotation_names table. Nothing particular.
373 */
374 
375 
376 /*! Creates an object to ensure that all entries in \a f are present
377     in annotation_names, using \a t for all its queryies.
378 */
379 
AnnotationNameCreator(const EStringList & f,Transaction * t)380 AnnotationNameCreator::AnnotationNameCreator( const EStringList & f,
381                                               Transaction * t )
382     : HelperRowCreator( "annotation_names", t, "annotation_names_name_key" ),
383       names( f )
384 {
385 }
386 
makeSelect()387 Query *  AnnotationNameCreator::makeSelect()
388 {
389     Query * q = new Query( "select id, name from annotation_names where "
390                            "name=any($1::text[])", this );
391 
392     EStringList sl;
393     EStringList::Iterator it( names );
394     while ( it ) {
395         EString name( *it );
396         if ( id( name ) == 0 )
397             sl.append( name );
398         ++it;
399     }
400     if ( sl.isEmpty() )
401         return 0;
402 
403     q->bind( 1, sl );
404     log( "Looking up " + fn( sl.count() ) + " annotation names", Log::Debug );
405     return q;
406 }
407 
408 
makeCopy()409 Query * AnnotationNameCreator::makeCopy()
410 {
411     Query * q = new Query( "copy annotation_names (name) "
412                            "from stdin with binary", this );
413     EStringList::Iterator it( names );
414     uint count = 0;
415     while ( it ) {
416         if ( id( *it ) == 0 ) {
417             count++;
418             q->bind( 1, *it );
419             q->submitLine();
420         }
421         ++it;
422     }
423 
424     if ( !count )
425         return 0;
426     log( "Inserting " + fn( count ) + " new annotation names" );
427     return q;
428 }
429 
430 
431 /*! \class AddressCreator helperrowcreator.h
432 
433     The AddressCreator ensures that a set of addresses exist in the
434     database and that their addresses are known.
435 
436     You have to create an object, then execute it. It'll use a
437     subtransaction and implicitly block your transaction until the IDs
438     are known.
439 */
440 
441 
442 /*! Constructs an AddressCreator which will ensure that all the \a
443     addresses have an Address::id(), using a subtransaction if \a t
444     for its work.
445 */
446 
AddressCreator(Dict<Address> * addresses,Transaction * t)447 AddressCreator::AddressCreator( Dict<Address> * addresses,
448                                 Transaction * t )
449     : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
450       a( addresses ), bulk( false ), decided( false ),
451       base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
452 {
453 }
454 
455 
456 /*! Constructs an AddressCreator which will ensure that \a address has
457     an Address::id(), using a subtransaction if \a t for its work.
458 */
459 
AddressCreator(Address * address,class Transaction * t)460 AddressCreator::AddressCreator( Address * address, class Transaction * t )
461     : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
462       a( new Dict<Address> ), bulk( false ), decided( false ),
463       base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
464 {
465     a->insert( AddressCreator::key( address ), address );
466 }
467 
468 
469 /*! Constructs an AddressCreator which will ensure that all the \a
470     addresses have an Address::id(), using a subtransaction if \a t
471     for its work.
472 */
473 
474 
AddressCreator(List<Address> * addresses,class Transaction * t)475 AddressCreator::AddressCreator( List<Address> * addresses,
476                                 class Transaction * t )
477     : HelperRowCreator( "addresses", t, "addresses_nld_key" ),
478       a( new Dict<Address> ), bulk( false ), decided( false ),
479       base( t ), sub( 0 ), insert( 0 ), obtain( 0 )
480 {
481     List<Address>::Iterator address( addresses );
482     while ( address ) {
483         a->insert( AddressCreator::key( address ), address );
484         ++address;
485     }
486 }
487 
488 
489 /*! This private helper looks for \a s in \a b, inserts it if not
490     present, and returns its number. Uses \a n to generate a new
491     unique number if necessary, and binds \a s to \a n in \a q.
492 
493     \a b has to use key() as key for each member. Nothing will work if
494     you break this rule. This sounds a little fragile, but I can't
495     think of a good alternative right now.
496 */
497 
param(Dict<uint> * b,const EString & s,uint & n,Query * q)498 uint AddressCreator::param( Dict<uint> * b, const EString & s,
499                             uint & n,
500                             Query * q )
501 {
502     uint * r = b->find( s );
503     if ( !r ) {
504         r = (uint*)Allocator::alloc( sizeof( uint ), 0 );
505         *r = n++;
506         b->insert( s, r );
507         q->bind( *r, s );
508     }
509     return *r;
510 }
511 
512 
513 /*! Creates a select to look for as many addresses as possible, but
514     binding no more than 128 strings.
515 */
516 
makeSelect()517 Query * AddressCreator::makeSelect()
518 {
519     EString s = "select id, name, localpart::text, domain::text "
520                 "from addresses where ";
521     Query * q = new Query( "", this );
522     uint n = 1;
523     Dict<uint> binds;
524     PgUtf8Codec p;
525     bool first = true;
526     Dict<Address>::Iterator i( a );
527     asked.clear();
528     while ( i && n < 128 ) {
529         if ( !i->id() ) {
530             EString name( p.fromUnicode( i->uname() ) );
531             EString lp( i->localpart().utf8() );
532             EString dom( i->domain().utf8() );
533 
534             uint bn = param( &binds, name, n, q );
535             uint bl = param( &binds, lp, n, q );
536             uint bd = param( &binds, dom, n, q );
537 
538             if ( !first )
539                 s.append( " or " );
540             first = false;
541             s.append( "(name=$" );
542             s.appendNumber( bn );
543             s.append( " and localpart=$" );
544             s.appendNumber( bl );
545             s.append( "::citext and domain=$" );
546             s.appendNumber( bd );
547             s.append( "::citext)" );
548 
549             asked.append( i );
550         }
551         ++i;
552     }
553     if ( asked.isEmpty() )
554         return 0;
555     q->setString( s );
556     log( "Looking up " + fn( asked.count() ) + " addresses", Log::Debug );
557     return q;
558 }
559 
560 
processSelect(Query * q)561 void AddressCreator::processSelect( Query * q )
562 {
563     while ( q->hasResults() ) {
564         Row * r = q->nextRow();
565         Address * c =
566             new Address( r->getUString( "name" ),
567                          r->getUString( "localpart" ),
568                          r->getUString( "domain" ) );
569         Address * our = a->find( key( c ) );
570         if ( our )
571             our->setId( r->getInt( "id" ) );
572         else
573             log( "Unexpected result from db: " + c->toString( false ) );
574     }
575 }
576 
577 
makeCopy()578 Query * AddressCreator::makeCopy()
579 {
580     uint count = 0;
581     Query * q = new Query( "copy addresses (name,localpart,domain) "
582                            "from stdin with binary", this );
583     List<Address>::Iterator i( asked );
584     while ( i ) {
585         if ( !i->id() ) {
586             q->bind( 1, i->uname() );
587             q->bind( 2, i->localpart() );
588             q->bind( 3, i->domain() );
589             q->submitLine();
590             count++;
591         }
592         ++i;
593     }
594     if ( !count )
595         return 0;
596     log( "Inserting " + fn( count ) + " new addresses" );
597     return q;
598 }
599 
600 
601 /*! Returns a EString derived from \a a in a unique fashion. Two
602     addresses that are the same according to the RFC rules have the
603     same key().
604 */
605 
key(Address * a)606 EString AddressCreator::key( Address * a )
607 {
608     EString r;
609     r.append( a->domain().utf8().lower() );
610     r.append( '\0' );
611     r.append( a->localpart().utf8().lower() );
612     r.append( '\0' );
613     r.append( a->uname().utf8() );
614     return r;
615 }
616 
617 
618 // this constant decides when we change to using the temptable. where's
619 // the crossover point?
620 
621 static uint useTempTable = 30;
622 
623 
624 /*! This overloads HelperRowCreator::execute() and conditionally
625     replaces its state machine with one that's faster for large
626     address sets.
627 */
628 
execute()629 void AddressCreator::execute()
630 {
631     Scope x( log() );
632     if ( !decided ) {
633         uint c = 0;
634         Dict<Address>::Iterator i( a );
635         while ( c < useTempTable && i ) {
636             if ( !i->id() )
637                 ++c;
638             ++i;
639         }
640         if ( c >= useTempTable )
641             bulk = true;
642         decided = true;
643     }
644 
645     if ( !bulk ) {
646         HelperRowCreator::execute();
647         return;
648     }
649 
650     if ( !sub ) {
651         base->enqueue( new Query( "create temporary table na ("
652                                   "id integer, "
653                                   "f boolean, "
654                                   "name text, "
655                                   "localpart citext, "
656                                   "domain citext )", 0 ) );
657         Query * q = new Query( "copy na (id, f, name,localpart,domain) "
658                                "from stdin with binary", this );
659         Dict<Address>::Iterator i( a );
660         while ( i ) {
661             if ( !i->id() ) {
662                 q->bind( 1, 0 );
663                 q->bind( 2, false );
664                 q->bind( 3, i->uname() );
665                 q->bind( 4, i->localpart() );
666                 q->bind( 5, i->domain() );
667                 q->submitLine();
668             }
669             ++i;
670         }
671         base->enqueue( q );
672 
673         sub = base->subTransaction( this );
674     }
675 
676     if ( insert && insert->failed() ) {
677         sub->restart();
678         insert = 0;
679     }
680 
681     if ( !insert ) {
682         sub->enqueue(
683             new Query(
684                 "update na set f=true, id=a.id from addresses a "
685                 "where na.localpart=a.localpart "
686                 "and na.domain=a.domain "
687                 "and na.name=a.name "
688                 "and not f", 0 ) );
689         sub->enqueue(
690             new Query(
691                 "update na "
692                 "set id=nextval(pg_get_serial_sequence('addresses','id')) "
693                 "where id = 0 and not f", 0 ) );
694         insert =
695             new Query(
696                 "insert into addresses "
697                 "(id, name, localpart, domain) "
698                 "select id, name, localpart, domain "
699                 "from na where not f", this );
700         sub->enqueue( insert );
701         sub->execute();
702     }
703 
704     if ( !insert->done() )
705         return;
706 
707     if ( !obtain ) {
708         obtain = new Query( "select id, name, localpart::text, domain::text "
709                             "from na", this );
710         sub->enqueue( obtain );
711         sub->enqueue( new Query( "drop table na", 0 ) );
712         sub->commit();
713     }
714 
715     processSelect( obtain );
716 }
717 
718 
719 /*! \class ThreadRootCreator helperrowcreator.h
720 
721     The ThreadRootCreator class thread_roots rows. The only particular
722     here is that id() works on all the message-ids, not just the root
723     ids.
724 */
725 
726 /*! Constructs a ThreadRootCreator that will make sure that the
727     messages in \a l are all threadable, using a subtransaction of \a
728     t for all db work.
729 */
730 
ThreadRootCreator(List<ThreadRootCreator::Message> * l,Transaction * t)731 ThreadRootCreator::ThreadRootCreator( List<ThreadRootCreator::Message> * l,
732                                       Transaction * t )
733     : HelperRowCreator( "thread_roots", t, "thread_roots_messageid_key" ),
734       messages( l ), nodes( new Dict<ThreadNode> ), first( true )
735 {
736     List<Message>::Iterator m( messages );
737     while ( m ) {
738         EStringList l = m->references();
739         l.append( m->messageId() );
740         ++m;
741         EStringList::Iterator s( l );
742         ThreadNode * root = 0;
743         while ( s ) {
744             if ( !s->isEmpty() ) {
745                 ThreadNode * n = nodes->find( *s );
746                 if ( !n ) {
747                     n = new ThreadNode( *s );
748                     nodes->insert( *s, n );
749                 }
750                 while ( n->parent )
751                     n = n->parent;
752                 if ( n == root )
753                     ;
754                 else if ( root )
755                     n->parent = root;
756                 else
757                     root = n;
758             }
759             ++s;
760         }
761     }
762 }
763 
764 
makeSelect()765 Query * ThreadRootCreator::makeSelect()
766 {
767     Query * q = 0;
768     EStringList l;
769     Dict<ThreadNode>::Iterator i( nodes );
770     while ( i ) {
771         if ( first || ( !i->parent && !i->trid ) )
772             l.append( i->id );
773         ++i;
774     }
775     if ( first ) // look for IDs in messages.thread_root - once
776         q = new Query( "select id, messageid as name from thread_roots "
777                        "where messageid=any($1::text[]) "
778                        "union "
779                        "select m.thread_root as id, hf.value as name "
780                        "from messages m join header_fields hf on "
781                        "(m.id=hf.message and hf.field=13) "
782                        "where hf.value=any($1::text[]) "
783                        "and m.thread_root is not null",
784                        this );
785     else
786         q = new Query( "select id, messageid as name from thread_roots "
787                        "where messageid=any($1::text[])",
788                        this );
789     first = false;
790     if ( l.isEmpty() )
791         return 0;
792     q->bind( 1, l );
793     return q;
794 }
795 
796 
makeCopy()797 Query * ThreadRootCreator::makeCopy()
798 {
799     Query * q = new Query( "copy thread_roots( messageid ) "
800                            "from stdin with binary", 0 );
801     Dict<ThreadNode>::Iterator i( nodes );
802     while ( i ) {
803         if ( !i->parent && !i->trid ) {
804             q->bind( 1, i->id );
805             q->submitLine();
806         }
807         ++i;
808     }
809     return q;
810 }
811 
812 
id(const EString & id)813 uint ThreadRootCreator::id( const EString & id )
814 {
815     ThreadNode * n = nodes->find( id );
816     if ( !n )
817         return 0;
818     while ( n->parent )
819         n = n->parent;
820     return n->trid;
821 }
822 
823 
add(const EString & id,uint i)824 void ThreadRootCreator::add( const EString & id, uint i )
825 {
826     ThreadNode * n = nodes->find( id );
827     if ( !n ) {
828         n = new ThreadNode( id );
829         nodes->insert( id, n );
830     }
831     n->trid = i;
832 }
833 
834 
postprocess(Transaction * t)835 void ThreadRootCreator::postprocess( Transaction * t )
836 {
837     Dict<ThreadNode>::Iterator i( nodes );
838     IntegerSet merged;
839     while ( i ) {
840         if ( i->parent && i->trid ) {
841             ThreadNode * p = i->parent;
842             while ( p->parent )
843                 p = p->parent;
844             if ( i->trid != p->trid ) {
845                 int big = i->trid;
846                 int small = p->trid;
847                 if ( big < small ) {
848                     big = p->trid;
849                     small = i->trid;
850                 }
851                 p->trid = small;
852                 if ( !merged.contains( big ) ) {
853                     Query * q = new Query( "select merge_threads( $1, $2 )", 0 );
854                     q->bind( 1, small );
855                     q->bind( 2, big );
856                     t->enqueue( q );
857                 }
858             }
859         }
860         ++i;
861     }
862 
863 }
864