1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2 
3 #include "transaction.h"
4 
5 #include "eventloop.h"
6 #include "database.h"
7 #include "query.h"
8 #include "event.h"
9 #include "scope.h"
10 #include "list.h"
11 
12 
13 class TransactionData
14     : public Garbage
15 {
16 public:
TransactionData()17     TransactionData()
18         : state( Transaction::Inactive ), parent( 0 ), activeChild( 0 ),
19           children( 0 ),
20           submittedCommit( false ), submittedBegin( false ),
21           committing( false ),
22           owner( 0 ), db( 0 ), queries( 0 ), failedQuery( 0 )
23     {}
24 
25     Transaction::State state;
26     Transaction * parent;
27     Transaction * activeChild;
28     EString savepoint;
29     uint children;
30     bool submittedCommit;
31     bool submittedBegin;
32     bool committing;
33     EventHandler * owner;
34     Database *db;
35 
36     List< Query > *queries;
37 
38     Query * failedQuery;
39     EString error;
40 
41     class CommitBouncer
42         : public EventHandler
43     {
44     public:
CommitBouncer(Transaction * t)45         CommitBouncer( Transaction * t ): q( 0 ), me( t ) {}
46 
execute()47         void execute () {
48             me->finalizeTransaction( q );
49         }
50 
51         Query * q;
52 
53     private:
54         Transaction * me;
55     };
56 
57     class BeginBouncer
58         : public EventHandler
59     {
60     public:
BeginBouncer(Transaction * t)61         BeginBouncer( Transaction * t ): q( 0 ), me( t ) {}
62 
execute()63         void execute () {
64             me->finalizeBegin( q );
65         }
66 
67         Query * q;
68 
69     private:
70         Transaction * me;
71     };
72 };
73 
74 
75 /*! \class Transaction transaction.h
76     This class manages a single database transaction.
77 
78     A Transaction accepts a series of queries via enqueue(), and sends
79     them to the server when execute() is called. It ends when commit()
80     or rollback() is called. Its state() indicates its progress.
81 
82     A Transaction commandeers a database handle when you ask it to
83     execute its queries, and keeps it until commit() or rollback(). If
84     you give it a database handle using setDatabase(), it'll use that
85     instead of asking for one.
86 
87     The Transaction can also provide subtransactions; these are
88     implemented using SAVEPOINT, RELEASE SAVEPOINT for commit() and
89     ROLLBACK TO SAVEPOINT for restart() and rollback.
90 
91     When you call subTransaction(), you get a new Transaction which
92     isn't yet active. The subtransaction becomes active when you
93     execute() or commit() it. Becoming active, in this context, means
94     that it enqueues a "savepoint" in its parent, and when that
95     completes, the subtransaction is active. The parent becomes active
96     again when the subtransaction finishes, with commit() ("release
97     savepoint") or rollback() ("rollback so savepoint; release
98     savepoint").
99 
100     It's possible to use a Transaction for any combination of
101     subtransactions and queries. A Query enqueued in the parent waits
102     until any active subtransaction finishes. Similarly, if you
103     execute() one subtransaction while another is active, the new
104     subtransaction will wait. The parent executes its queries in the
105     order submitted, including the "savepoint" queries that activate a
106     subtransaction.
107 */
108 
109 
110 /*! Creates a new Transaction owned by \a ev (which MUST NOT be 0). */
111 
Transaction(EventHandler * ev)112 Transaction::Transaction( EventHandler *ev )
113     : d( new TransactionData )
114 {
115     d->owner = ev;
116     d->savepoint = "s";
117 }
118 
119 
120 /*! Returns a pointer to a new Transaction that is subordinate to the
121     current one, but which can be independently committed or rolled
122     back.
123 
124     The returned subtransaction isn't active yet; if you call
125     execute() or commit() on it, it will attempt to take over its
126     parent's Database and do its work. If you don't call execute() on
127     the subtransaction before commit() on the parent, then the
128     subtransaction cannot be used.
129 
130     The subtransaction will notify \a ev when it succeeds or fails.
131 */
132 
subTransaction(EventHandler * ev)133 Transaction * Transaction::subTransaction( EventHandler * ev )
134 {
135     d->children++;
136 
137     if ( ev == 0 )
138         ev = d->owner;
139 
140     Transaction * t = new Transaction( ev );
141     t->d->parent = this;
142     t->d->savepoint = d->savepoint + "_" + fn( d->children );
143 
144     return t;
145 }
146 
147 
148 /*! Returns a pointer to the parent of this Transaction, which will be 0
149     if this is not a subTransaction().
150 */
151 
parent() const152 Transaction * Transaction::parent() const
153 {
154     return d->parent;
155 }
156 
157 
158 /*! Sets this Transaction's Database handle to \a db.
159 
160     This function is used by the Database when the BEGIN is processed.
161 */
162 
setDatabase(Database * db)163 void Transaction::setDatabase( Database *db )
164 {
165     d->db = db;
166 
167     Scope x( d->owner->log() );
168     log( "Using database connection " + fn( db->connectionNumber() ),
169          Log::Debug );
170 
171     if ( d->queries )
172         return;
173     d->queries = new List<Query>;
174     if ( !d->submittedBegin ) {
175         TransactionData::BeginBouncer * b
176             = new TransactionData::BeginBouncer( this );
177         b->q = new Query( "begin", b );
178         b->q->setTransaction( this );
179         d->queries->append( b->q );
180         d->submittedBegin = true;
181     }
182 }
183 
184 
185 /*! Sets this Transaction's state to \a s, which must be one of Inactive
186     (if it has not yet been started), Executing, Completed, or Failed.
187 */
188 
setState(State s)189 void Transaction::setState( State s )
190 {
191     d->state = s;
192 }
193 
194 
195 /*! Returns the current state of this Transaction. */
196 
state() const197 Transaction::State Transaction::state() const
198 {
199     return d->state;
200 }
201 
202 
203 /*! Returns true only if this Transaction has failed, and false if it
204     succeeded, or is in progress.
205 */
206 
failed() const207 bool Transaction::failed() const
208 {
209     return d->state == Failed;
210 }
211 
212 
213 /*! Returns true only if this Transaction has either succeeded or
214     failed, and false if it is still awaiting completion.
215 */
216 
done() const217 bool Transaction::done() const
218 {
219     return d->state == Completed ||
220         d->state == Failed ||
221         d->state == RolledBack;
222 }
223 
224 
225 /*! Clears this Transaction's error state (as set with setError()) and
226     places it in Executing state. Used to support savepoints.
227 */
228 
clearError()229 void Transaction::clearError()
230 {
231     d->failedQuery = 0;
232     d->error.truncate();
233     d->state = Executing;
234 }
235 
236 
237 /*! Sets this Transaction's state() to Failed, and records the error
238     message \a s. The first \a query that failed is recorded, and is
239     returned by failedQuery() (but \a query may be 0 if the failure
240     was not specific to a query within the transaction).
241 */
242 
setError(Query * query,const EString & s)243 void Transaction::setError( Query * query, const EString &s )
244 {
245     if ( d->state == Failed || !d->owner )
246         return;
247 
248     Scope x( d->owner->log() );
249     if ( query && query->canFail() )
250         log( s, Log::Debug );
251     else if ( d->parent )
252         log( s );
253     else
254         log( s, Log::Error );
255     d->failedQuery = query;
256     d->error = s;
257     d->state = Failed;
258     if ( !query )
259         return;
260     EString qs = query->string();
261     if ( qs.startsWith( "select " ) && qs.contains( " from " ) )
262         qs = qs.section( " from ", 1 ) + "...";
263     else if ( qs.startsWith( "insert into " ) && qs.contains( " values " ) )
264         qs = qs.section( " values ", 1 ) + "...";
265     else if ( qs.startsWith( "update " ) && qs.contains( " set " ) )
266         qs = qs.section( " set ", 1 ) + "...";
267     else if ( qs.length() > 32 )
268         qs = qs.mid( 0, 32 ) + "...";
269     d->error.append( " (query: " );
270     d->error.append( qs );
271     d->error.append( ")" );
272 }
273 
274 
275 /*! Returns the error message associated with this Transaction. This
276     value is meaningful only if the Transaction has failed().
277 */
278 
error() const279 EString Transaction::error() const
280 {
281     return d->error;
282 }
283 
284 
285 /*! Returns a pointer to the first Query in this transaction that
286     failed. The return value is meaningful only if the transaction
287     has failed, and 0 otherwise.
288 
289     The return value may also be 0 if the Transaction has been forcibly
290     rolled back by the Postgres class because of a timeout (such as the
291     caller forgetting to ever commit() the Transaction).
292 
293     This function is useful in composing error messages.
294 */
295 
failedQuery() const296 Query * Transaction::failedQuery() const
297 {
298     return d->failedQuery;
299 }
300 
301 
302 /*! Enqueues the query \a q within this Transaction, to be sent to the
303     server only after execute() is called. The BEGIN is automatically
304     enqueued before the first query in a Transaction.
305 */
306 
enqueue(Query * q)307 void Transaction::enqueue( Query *q )
308 {
309     Scope x( d->owner->log() );
310     if ( d->submittedCommit ) {
311         q->setError( "Query submitted after commit/rollback: " + q->string() );
312         return;
313     }
314     if ( !d->queries )
315         d->queries = new List< Query >;
316     q->setTransaction( this );
317     d->queries->append( q );
318     q->setState( Query::Submitted );
319     q->checkParameters();
320 }
321 
322 
323 /*! \overload
324 
325     This version creates a new Query based on \a text and enqueues it.
326     It does not give the caller a chance to set the query's owner or to
327     bind parameters, so it's most useful for things like DDL.
328 */
329 
enqueue(const char * text)330 void Transaction::enqueue( const char * text )
331 {
332     enqueue( new Query( text, 0 ) );
333 }
334 
335 
336 /*! \overload
337 
338     This version creates a new Query based on \a text and enqueues it.
339     Limitations as for the const char * variant above.
340 */
341 
enqueue(const EString & text)342 void Transaction::enqueue( const EString & text )
343 {
344     enqueue( new Query( text, 0 ) );
345 }
346 
347 
348 /*! Issues a ROLLBACK to abandon the Transaction, and fails any
349     queries that still haven't been sent to the server. The owner is
350     notified of completion.
351 */
352 
rollback()353 void Transaction::rollback()
354 {
355     if ( d->submittedCommit ) {
356         log( "rollback() called after commit" );
357         return;
358     }
359 
360     if ( !d->submittedBegin ) {
361         d->submittedBegin = true;
362         d->submittedCommit = true;
363         setState( RolledBack );
364         return;
365     }
366 
367     if ( d->parent ) {
368         // if we're a subtransaction, then what we need to do is roll
369         // back to the savepoint, then release it...
370         enqueue( new Query( "rollback to " + d->savepoint, 0 ) );
371         TransactionData::CommitBouncer * cb =
372             new TransactionData::CommitBouncer( this );
373         Query * q = new Query( "release savepoint " + d->savepoint, cb );
374         cb->q = q;
375         enqueue( q );
376         execute();
377     }
378     else {
379         TransactionData::CommitBouncer * cb =
380             new TransactionData::CommitBouncer( this );
381         Query * q = new Query( "rollback", cb );
382         cb->q = q;
383         enqueue( q );
384         execute();
385     }
386     d->submittedCommit = true;
387 
388 }
389 
390 
391 /*! Unwinds whatever the subtransaction has done and restarts it.
392 */
393 
restart()394 void Transaction::restart()
395 {
396     if ( d->submittedCommit ) {
397         log( "restart() called after commit/rollback" );
398         return;
399     }
400     else if ( !d->submittedBegin ) {
401         return;
402     }
403     else if ( !d->parent ) {
404         d->queries->clear();
405         enqueue( new Query( "rollback", d->owner ) );
406         d->submittedBegin = false;
407     }
408     else {
409         d->queries->clear();
410         enqueue( new Query( "rollback to " + d->savepoint, d->owner ) );
411         setState( Executing );
412     }
413     execute();
414 }
415 
416 
417 /*! Handles whatever needs to happen when a BEGIN or SAVEPOINT
418     finishes; \a q is the begin query.
419 */
420 
finalizeBegin(Query * q)421 void Transaction::finalizeBegin( Query * q )
422 {
423     if ( q->failed() ) {
424         if ( d->parent && d->parent->d->activeChild == this )
425             d->parent->d->activeChild = 0;
426         if ( d->parent )
427             setError( q, "Savepoint failed" );
428         else
429             setError( q, "Begin failed (huh?)" );
430         List<Query>::Iterator i( d->queries );
431         while ( i ) {
432             i->setError( "Transaction unable to start" );
433             ++i;
434         }
435         notify();
436         if ( d->parent )
437             d->parent->execute();
438     }
439     else {
440         setState( Executing );
441         execute();
442     }
443 }
444 
445 
446 /*! This private function handles whatever needs to happen when a
447     transaction finishes; \a q is the finishing query (typically
448     commit, rollback or release savepoint). There are three cases:
449 
450     If the commit/rollback works, we restart the parent.
451 
452     If a subtransaction is rolled back and the rollback fails, we're
453     in real trouble.
454 
455     If a subtransaction should commit and the release savepoint fails,
456     we roll the subtransaction back and should eventually hand over to
457     the parent transaction.
458 */
459 
finalizeTransaction(Query * q)460 void Transaction::finalizeTransaction( Query * q )
461 {
462     if ( !q->failed() ) {
463         if ( !d->committing )
464             setState( RolledBack );
465         else if ( d->state != Failed )
466             setState( Completed );
467         // the third case would be if ( failed ) setstate( failed )
468         notify();
469         if ( d->parent ) {
470             if ( d->parent->d->activeChild == this )
471                 d->parent->d->activeChild = 0;
472             parent()->execute();
473         }
474     }
475     else if ( d->committing ) {
476         d->committing = false;
477         d->submittedCommit = false;
478         rollback();
479         setError( q, q->error() );
480         notify();
481     }
482     else {
483         setError( q, q->error() );
484         notify();
485         // a rollback failed. how is this even possible? what to do?
486     }
487 }
488 
489 
490 /*! Issues a COMMIT to complete the Transaction (after sending any
491     queries that were already enqueued). The owner is notified when
492     the Transaction completes.
493 
494     For a failed() Transaction, commit() is equivalent to rollback().
495 */
496 
commit()497 void Transaction::commit()
498 {
499     if ( d->submittedCommit )
500         return;
501 
502     if ( !d->submittedBegin &&
503          ( !d->queries || d->queries->isEmpty() ) ) {
504         d->submittedBegin = true;
505         d->submittedCommit = true;
506         setState( Completed );
507         return;
508     }
509 
510     if ( d->parent ) {
511         TransactionData::CommitBouncer * cb =
512             new TransactionData::CommitBouncer( this );
513         Query * q = new Query( "release savepoint " + d->savepoint, cb );
514         cb->q = q;
515         enqueue( q );
516     }
517     else {
518         TransactionData::CommitBouncer * cb =
519             new TransactionData::CommitBouncer( this );
520         Query * q = new Query( "commit", cb );
521         cb->q = q;
522         enqueue( q );
523     }
524     d->submittedCommit = true;
525     d->committing = true;
526 
527     execute();
528 }
529 
530 
531 /*! Executes the queries enqueued so far. */
532 
execute()533 void Transaction::execute()
534 {
535     if ( !d->queries || d->queries->isEmpty() )
536         return;
537 
538     // we may need to set up queries in order to start
539     if ( !d->submittedBegin ) {
540         // if not, we have to obtain one
541         bool parentDone = false;
542         Transaction * p = d->parent;
543         while ( p && !parentDone ) {
544             if ( p->d->committing || p->done() )
545                 parentDone = true;
546             if ( p->state() == Inactive )
547                 p = p->d->parent;
548             else
549                 p = 0;
550         }
551         if ( parentDone ) {
552             List<Query>::Iterator i( d->queries );
553             while ( i ) {
554                 i->setError( "Transaction started after parent finished" );
555                 ++i;
556             }
557             return;
558         }
559         d->submittedBegin = true;
560         if ( d->parent ) {
561             // we ask to borrow our parent's handle
562             TransactionData::BeginBouncer * b
563                 = new TransactionData::BeginBouncer( this );
564             b->q = new Query( "savepoint " + d->savepoint, b );
565             d->parent->enqueue( b->q );
566             b->q->setTransaction( this );
567             d->parent->execute();
568         }
569         else {
570             // if we're an ordinary transaction, we queue a begin in
571             // the open pool...
572             TransactionData::BeginBouncer * b
573                 = new TransactionData::BeginBouncer( this );
574             b->q = new Query( "begin", b );
575             // ... and tell the db to shift control to us.
576             b->q->setTransaction( this );
577             Database::submit( b->q );
578         }
579     }
580 
581     // if we have a database, poke it
582     Transaction * t = this;
583     while ( t->d->parent )
584         t = t->d->parent;
585     if ( t->d->db )
586         t->d->db->processQueue();
587 }
588 
589 
590 /*! Returns a pointer to the owner of this query, as specified to the
591     constructor. Transactions MUST have owners, so this function may
592     not return 0. There is an exception: If the owner is severely
593     buggy, notify() may set the owner to 0 to avoid segfaults.
594 */
595 
owner() const596 EventHandler * Transaction::owner() const
597 {
598     return d->owner;
599 }
600 
601 
602 /*! Notifies the owner of this Transaction about a significant event. */
603 
notify()604 void Transaction::notify()
605 {
606     if ( !d->owner )
607         return;
608     Scope s( d->owner->log() );
609     try {
610         d->owner->execute();
611     }
612     catch ( Exception e ) {
613         d->owner = 0; // so we can't get close to a segfault again
614         if ( e == Invariant ) {
615             setError( 0,
616                       "Invariant failed "
617                       "while processing Transaction::notify()" );
618             rollback();
619             List<Connection>::Iterator i( EventLoop::global()->connections() );
620             while ( i ) {
621                 Connection * c = i;
622                 ++i;
623                 Log * l = Scope::current()->log();
624                 while ( l && l != c->log() )
625                     l = l->parent();
626                 if ( c->type() != Connection::Listener && l ) {
627                     Scope x( l );
628                     ::log( "Invariant failed; Closing connection abruptly",
629                            Log::Error );
630                     EventLoop::global()->removeConnection( c );
631                     c->close();
632                 }
633             }
634         }
635         else {
636             throw e;
637         }
638     }
639     if ( done() && d->parent &&
640          d->parent->d->queries &&
641          d->parent->d->queries->isEmpty() )
642         d->parent->notify();
643 }
644 
645 
646 /*! Returns a pointer to the currently active subtransaction, or to
647     this transaction is no subtransaction is active.
648 */
649 
activeSubTransaction()650 Transaction * Transaction::activeSubTransaction()
651 {
652     Transaction * t = this;
653     while ( t->d->parent )
654         t = t->d->parent;
655     while ( t->d->activeChild )
656         t = t->d->activeChild;
657     return t;
658 }
659 
660 
661 /*! Removes all queries that can be sent to the server from the front
662     of the queue and returns them. May change activeSubTransaction()
663     as a side effect, if the last query starts a subtransaction.
664 
665     The returned pointer is never null, but the list may be empty.
666 */
667 
submittedQueries()668 List< Query > * Transaction::submittedQueries()
669 {
670     List<Query> * r = new List<Query>();
671 
672     Transaction * t = activeSubTransaction();
673 
674     if ( !t->d->queries )
675         return r;
676 
677     bool last = false;
678     while ( !t->d->queries->isEmpty() && !last ) {
679         Query * q = t->d->queries->shift();
680         r->append( q );
681         // if we change to a subtransaction, we start picking queries
682         // there, and don't send anything more yet
683         if ( q->transaction() != t ) {
684             t->d->activeChild = q->transaction();
685             last = true;
686         }
687         // if the query is a copy, we have to let it finish before we
688         // can send more queries.
689         if ( q->inputLines() ) {
690             last = true;
691         }
692     }
693 
694     return r;
695 }
696