1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2
3 #include "transaction.h"
4
5 #include "eventloop.h"
6 #include "database.h"
7 #include "query.h"
8 #include "event.h"
9 #include "scope.h"
10 #include "list.h"
11
12
13 class TransactionData
14 : public Garbage
15 {
16 public:
TransactionData()17 TransactionData()
18 : state( Transaction::Inactive ), parent( 0 ), activeChild( 0 ),
19 children( 0 ),
20 submittedCommit( false ), submittedBegin( false ),
21 committing( false ),
22 owner( 0 ), db( 0 ), queries( 0 ), failedQuery( 0 )
23 {}
24
25 Transaction::State state;
26 Transaction * parent;
27 Transaction * activeChild;
28 EString savepoint;
29 uint children;
30 bool submittedCommit;
31 bool submittedBegin;
32 bool committing;
33 EventHandler * owner;
34 Database *db;
35
36 List< Query > *queries;
37
38 Query * failedQuery;
39 EString error;
40
41 class CommitBouncer
42 : public EventHandler
43 {
44 public:
CommitBouncer(Transaction * t)45 CommitBouncer( Transaction * t ): q( 0 ), me( t ) {}
46
execute()47 void execute () {
48 me->finalizeTransaction( q );
49 }
50
51 Query * q;
52
53 private:
54 Transaction * me;
55 };
56
57 class BeginBouncer
58 : public EventHandler
59 {
60 public:
BeginBouncer(Transaction * t)61 BeginBouncer( Transaction * t ): q( 0 ), me( t ) {}
62
execute()63 void execute () {
64 me->finalizeBegin( q );
65 }
66
67 Query * q;
68
69 private:
70 Transaction * me;
71 };
72 };
73
74
75 /*! \class Transaction transaction.h
76 This class manages a single database transaction.
77
78 A Transaction accepts a series of queries via enqueue(), and sends
79 them to the server when execute() is called. It ends when commit()
80 or rollback() is called. Its state() indicates its progress.
81
82 A Transaction commandeers a database handle when you ask it to
83 execute its queries, and keeps it until commit() or rollback(). If
84 you give it a database handle using setDatabase(), it'll use that
85 instead of asking for one.
86
87 The Transaction can also provide subtransactions; these are
88 implemented using SAVEPOINT, RELEASE SAVEPOINT for commit() and
89 ROLLBACK TO SAVEPOINT for restart() and rollback.
90
91 When you call subTransaction(), you get a new Transaction which
92 isn't yet active. The subtransaction becomes active when you
93 execute() or commit() it. Becoming active, in this context, means
94 that it enqueues a "savepoint" in its parent, and when that
95 completes, the subtransaction is active. The parent becomes active
96 again when the subtransaction finishes, with commit() ("release
97 savepoint") or rollback() ("rollback so savepoint; release
98 savepoint").
99
100 It's possible to use a Transaction for any combination of
101 subtransactions and queries. A Query enqueued in the parent waits
102 until any active subtransaction finishes. Similarly, if you
103 execute() one subtransaction while another is active, the new
104 subtransaction will wait. The parent executes its queries in the
105 order submitted, including the "savepoint" queries that activate a
106 subtransaction.
107 */
108
109
110 /*! Creates a new Transaction owned by \a ev (which MUST NOT be 0). */
111
Transaction(EventHandler * ev)112 Transaction::Transaction( EventHandler *ev )
113 : d( new TransactionData )
114 {
115 d->owner = ev;
116 d->savepoint = "s";
117 }
118
119
120 /*! Returns a pointer to a new Transaction that is subordinate to the
121 current one, but which can be independently committed or rolled
122 back.
123
124 The returned subtransaction isn't active yet; if you call
125 execute() or commit() on it, it will attempt to take over its
126 parent's Database and do its work. If you don't call execute() on
127 the subtransaction before commit() on the parent, then the
128 subtransaction cannot be used.
129
130 The subtransaction will notify \a ev when it succeeds or fails.
131 */
132
subTransaction(EventHandler * ev)133 Transaction * Transaction::subTransaction( EventHandler * ev )
134 {
135 d->children++;
136
137 if ( ev == 0 )
138 ev = d->owner;
139
140 Transaction * t = new Transaction( ev );
141 t->d->parent = this;
142 t->d->savepoint = d->savepoint + "_" + fn( d->children );
143
144 return t;
145 }
146
147
148 /*! Returns a pointer to the parent of this Transaction, which will be 0
149 if this is not a subTransaction().
150 */
151
parent() const152 Transaction * Transaction::parent() const
153 {
154 return d->parent;
155 }
156
157
158 /*! Sets this Transaction's Database handle to \a db.
159
160 This function is used by the Database when the BEGIN is processed.
161 */
162
setDatabase(Database * db)163 void Transaction::setDatabase( Database *db )
164 {
165 d->db = db;
166
167 Scope x( d->owner->log() );
168 log( "Using database connection " + fn( db->connectionNumber() ),
169 Log::Debug );
170
171 if ( d->queries )
172 return;
173 d->queries = new List<Query>;
174 if ( !d->submittedBegin ) {
175 TransactionData::BeginBouncer * b
176 = new TransactionData::BeginBouncer( this );
177 b->q = new Query( "begin", b );
178 b->q->setTransaction( this );
179 d->queries->append( b->q );
180 d->submittedBegin = true;
181 }
182 }
183
184
185 /*! Sets this Transaction's state to \a s, which must be one of Inactive
186 (if it has not yet been started), Executing, Completed, or Failed.
187 */
188
setState(State s)189 void Transaction::setState( State s )
190 {
191 d->state = s;
192 }
193
194
195 /*! Returns the current state of this Transaction. */
196
state() const197 Transaction::State Transaction::state() const
198 {
199 return d->state;
200 }
201
202
203 /*! Returns true only if this Transaction has failed, and false if it
204 succeeded, or is in progress.
205 */
206
failed() const207 bool Transaction::failed() const
208 {
209 return d->state == Failed;
210 }
211
212
213 /*! Returns true only if this Transaction has either succeeded or
214 failed, and false if it is still awaiting completion.
215 */
216
done() const217 bool Transaction::done() const
218 {
219 return d->state == Completed ||
220 d->state == Failed ||
221 d->state == RolledBack;
222 }
223
224
225 /*! Clears this Transaction's error state (as set with setError()) and
226 places it in Executing state. Used to support savepoints.
227 */
228
clearError()229 void Transaction::clearError()
230 {
231 d->failedQuery = 0;
232 d->error.truncate();
233 d->state = Executing;
234 }
235
236
237 /*! Sets this Transaction's state() to Failed, and records the error
238 message \a s. The first \a query that failed is recorded, and is
239 returned by failedQuery() (but \a query may be 0 if the failure
240 was not specific to a query within the transaction).
241 */
242
setError(Query * query,const EString & s)243 void Transaction::setError( Query * query, const EString &s )
244 {
245 if ( d->state == Failed || !d->owner )
246 return;
247
248 Scope x( d->owner->log() );
249 if ( query && query->canFail() )
250 log( s, Log::Debug );
251 else if ( d->parent )
252 log( s );
253 else
254 log( s, Log::Error );
255 d->failedQuery = query;
256 d->error = s;
257 d->state = Failed;
258 if ( !query )
259 return;
260 EString qs = query->string();
261 if ( qs.startsWith( "select " ) && qs.contains( " from " ) )
262 qs = qs.section( " from ", 1 ) + "...";
263 else if ( qs.startsWith( "insert into " ) && qs.contains( " values " ) )
264 qs = qs.section( " values ", 1 ) + "...";
265 else if ( qs.startsWith( "update " ) && qs.contains( " set " ) )
266 qs = qs.section( " set ", 1 ) + "...";
267 else if ( qs.length() > 32 )
268 qs = qs.mid( 0, 32 ) + "...";
269 d->error.append( " (query: " );
270 d->error.append( qs );
271 d->error.append( ")" );
272 }
273
274
275 /*! Returns the error message associated with this Transaction. This
276 value is meaningful only if the Transaction has failed().
277 */
278
error() const279 EString Transaction::error() const
280 {
281 return d->error;
282 }
283
284
285 /*! Returns a pointer to the first Query in this transaction that
286 failed. The return value is meaningful only if the transaction
287 has failed, and 0 otherwise.
288
289 The return value may also be 0 if the Transaction has been forcibly
290 rolled back by the Postgres class because of a timeout (such as the
291 caller forgetting to ever commit() the Transaction).
292
293 This function is useful in composing error messages.
294 */
295
failedQuery() const296 Query * Transaction::failedQuery() const
297 {
298 return d->failedQuery;
299 }
300
301
302 /*! Enqueues the query \a q within this Transaction, to be sent to the
303 server only after execute() is called. The BEGIN is automatically
304 enqueued before the first query in a Transaction.
305 */
306
enqueue(Query * q)307 void Transaction::enqueue( Query *q )
308 {
309 Scope x( d->owner->log() );
310 if ( d->submittedCommit ) {
311 q->setError( "Query submitted after commit/rollback: " + q->string() );
312 return;
313 }
314 if ( !d->queries )
315 d->queries = new List< Query >;
316 q->setTransaction( this );
317 d->queries->append( q );
318 q->setState( Query::Submitted );
319 q->checkParameters();
320 }
321
322
323 /*! \overload
324
325 This version creates a new Query based on \a text and enqueues it.
326 It does not give the caller a chance to set the query's owner or to
327 bind parameters, so it's most useful for things like DDL.
328 */
329
enqueue(const char * text)330 void Transaction::enqueue( const char * text )
331 {
332 enqueue( new Query( text, 0 ) );
333 }
334
335
336 /*! \overload
337
338 This version creates a new Query based on \a text and enqueues it.
339 Limitations as for the const char * variant above.
340 */
341
enqueue(const EString & text)342 void Transaction::enqueue( const EString & text )
343 {
344 enqueue( new Query( text, 0 ) );
345 }
346
347
348 /*! Issues a ROLLBACK to abandon the Transaction, and fails any
349 queries that still haven't been sent to the server. The owner is
350 notified of completion.
351 */
352
rollback()353 void Transaction::rollback()
354 {
355 if ( d->submittedCommit ) {
356 log( "rollback() called after commit" );
357 return;
358 }
359
360 if ( !d->submittedBegin ) {
361 d->submittedBegin = true;
362 d->submittedCommit = true;
363 setState( RolledBack );
364 return;
365 }
366
367 if ( d->parent ) {
368 // if we're a subtransaction, then what we need to do is roll
369 // back to the savepoint, then release it...
370 enqueue( new Query( "rollback to " + d->savepoint, 0 ) );
371 TransactionData::CommitBouncer * cb =
372 new TransactionData::CommitBouncer( this );
373 Query * q = new Query( "release savepoint " + d->savepoint, cb );
374 cb->q = q;
375 enqueue( q );
376 execute();
377 }
378 else {
379 TransactionData::CommitBouncer * cb =
380 new TransactionData::CommitBouncer( this );
381 Query * q = new Query( "rollback", cb );
382 cb->q = q;
383 enqueue( q );
384 execute();
385 }
386 d->submittedCommit = true;
387
388 }
389
390
391 /*! Unwinds whatever the subtransaction has done and restarts it.
392 */
393
restart()394 void Transaction::restart()
395 {
396 if ( d->submittedCommit ) {
397 log( "restart() called after commit/rollback" );
398 return;
399 }
400 else if ( !d->submittedBegin ) {
401 return;
402 }
403 else if ( !d->parent ) {
404 d->queries->clear();
405 enqueue( new Query( "rollback", d->owner ) );
406 d->submittedBegin = false;
407 }
408 else {
409 d->queries->clear();
410 enqueue( new Query( "rollback to " + d->savepoint, d->owner ) );
411 setState( Executing );
412 }
413 execute();
414 }
415
416
417 /*! Handles whatever needs to happen when a BEGIN or SAVEPOINT
418 finishes; \a q is the begin query.
419 */
420
finalizeBegin(Query * q)421 void Transaction::finalizeBegin( Query * q )
422 {
423 if ( q->failed() ) {
424 if ( d->parent && d->parent->d->activeChild == this )
425 d->parent->d->activeChild = 0;
426 if ( d->parent )
427 setError( q, "Savepoint failed" );
428 else
429 setError( q, "Begin failed (huh?)" );
430 List<Query>::Iterator i( d->queries );
431 while ( i ) {
432 i->setError( "Transaction unable to start" );
433 ++i;
434 }
435 notify();
436 if ( d->parent )
437 d->parent->execute();
438 }
439 else {
440 setState( Executing );
441 execute();
442 }
443 }
444
445
446 /*! This private function handles whatever needs to happen when a
447 transaction finishes; \a q is the finishing query (typically
448 commit, rollback or release savepoint). There are three cases:
449
450 If the commit/rollback works, we restart the parent.
451
452 If a subtransaction is rolled back and the rollback fails, we're
453 in real trouble.
454
455 If a subtransaction should commit and the release savepoint fails,
456 we roll the subtransaction back and should eventually hand over to
457 the parent transaction.
458 */
459
finalizeTransaction(Query * q)460 void Transaction::finalizeTransaction( Query * q )
461 {
462 if ( !q->failed() ) {
463 if ( !d->committing )
464 setState( RolledBack );
465 else if ( d->state != Failed )
466 setState( Completed );
467 // the third case would be if ( failed ) setstate( failed )
468 notify();
469 if ( d->parent ) {
470 if ( d->parent->d->activeChild == this )
471 d->parent->d->activeChild = 0;
472 parent()->execute();
473 }
474 }
475 else if ( d->committing ) {
476 d->committing = false;
477 d->submittedCommit = false;
478 rollback();
479 setError( q, q->error() );
480 notify();
481 }
482 else {
483 setError( q, q->error() );
484 notify();
485 // a rollback failed. how is this even possible? what to do?
486 }
487 }
488
489
490 /*! Issues a COMMIT to complete the Transaction (after sending any
491 queries that were already enqueued). The owner is notified when
492 the Transaction completes.
493
494 For a failed() Transaction, commit() is equivalent to rollback().
495 */
496
commit()497 void Transaction::commit()
498 {
499 if ( d->submittedCommit )
500 return;
501
502 if ( !d->submittedBegin &&
503 ( !d->queries || d->queries->isEmpty() ) ) {
504 d->submittedBegin = true;
505 d->submittedCommit = true;
506 setState( Completed );
507 return;
508 }
509
510 if ( d->parent ) {
511 TransactionData::CommitBouncer * cb =
512 new TransactionData::CommitBouncer( this );
513 Query * q = new Query( "release savepoint " + d->savepoint, cb );
514 cb->q = q;
515 enqueue( q );
516 }
517 else {
518 TransactionData::CommitBouncer * cb =
519 new TransactionData::CommitBouncer( this );
520 Query * q = new Query( "commit", cb );
521 cb->q = q;
522 enqueue( q );
523 }
524 d->submittedCommit = true;
525 d->committing = true;
526
527 execute();
528 }
529
530
531 /*! Executes the queries enqueued so far. */
532
execute()533 void Transaction::execute()
534 {
535 if ( !d->queries || d->queries->isEmpty() )
536 return;
537
538 // we may need to set up queries in order to start
539 if ( !d->submittedBegin ) {
540 // if not, we have to obtain one
541 bool parentDone = false;
542 Transaction * p = d->parent;
543 while ( p && !parentDone ) {
544 if ( p->d->committing || p->done() )
545 parentDone = true;
546 if ( p->state() == Inactive )
547 p = p->d->parent;
548 else
549 p = 0;
550 }
551 if ( parentDone ) {
552 List<Query>::Iterator i( d->queries );
553 while ( i ) {
554 i->setError( "Transaction started after parent finished" );
555 ++i;
556 }
557 return;
558 }
559 d->submittedBegin = true;
560 if ( d->parent ) {
561 // we ask to borrow our parent's handle
562 TransactionData::BeginBouncer * b
563 = new TransactionData::BeginBouncer( this );
564 b->q = new Query( "savepoint " + d->savepoint, b );
565 d->parent->enqueue( b->q );
566 b->q->setTransaction( this );
567 d->parent->execute();
568 }
569 else {
570 // if we're an ordinary transaction, we queue a begin in
571 // the open pool...
572 TransactionData::BeginBouncer * b
573 = new TransactionData::BeginBouncer( this );
574 b->q = new Query( "begin", b );
575 // ... and tell the db to shift control to us.
576 b->q->setTransaction( this );
577 Database::submit( b->q );
578 }
579 }
580
581 // if we have a database, poke it
582 Transaction * t = this;
583 while ( t->d->parent )
584 t = t->d->parent;
585 if ( t->d->db )
586 t->d->db->processQueue();
587 }
588
589
590 /*! Returns a pointer to the owner of this query, as specified to the
591 constructor. Transactions MUST have owners, so this function may
592 not return 0. There is an exception: If the owner is severely
593 buggy, notify() may set the owner to 0 to avoid segfaults.
594 */
595
owner() const596 EventHandler * Transaction::owner() const
597 {
598 return d->owner;
599 }
600
601
602 /*! Notifies the owner of this Transaction about a significant event. */
603
notify()604 void Transaction::notify()
605 {
606 if ( !d->owner )
607 return;
608 Scope s( d->owner->log() );
609 try {
610 d->owner->execute();
611 }
612 catch ( Exception e ) {
613 d->owner = 0; // so we can't get close to a segfault again
614 if ( e == Invariant ) {
615 setError( 0,
616 "Invariant failed "
617 "while processing Transaction::notify()" );
618 rollback();
619 List<Connection>::Iterator i( EventLoop::global()->connections() );
620 while ( i ) {
621 Connection * c = i;
622 ++i;
623 Log * l = Scope::current()->log();
624 while ( l && l != c->log() )
625 l = l->parent();
626 if ( c->type() != Connection::Listener && l ) {
627 Scope x( l );
628 ::log( "Invariant failed; Closing connection abruptly",
629 Log::Error );
630 EventLoop::global()->removeConnection( c );
631 c->close();
632 }
633 }
634 }
635 else {
636 throw e;
637 }
638 }
639 if ( done() && d->parent &&
640 d->parent->d->queries &&
641 d->parent->d->queries->isEmpty() )
642 d->parent->notify();
643 }
644
645
646 /*! Returns a pointer to the currently active subtransaction, or to
647 this transaction is no subtransaction is active.
648 */
649
activeSubTransaction()650 Transaction * Transaction::activeSubTransaction()
651 {
652 Transaction * t = this;
653 while ( t->d->parent )
654 t = t->d->parent;
655 while ( t->d->activeChild )
656 t = t->d->activeChild;
657 return t;
658 }
659
660
661 /*! Removes all queries that can be sent to the server from the front
662 of the queue and returns them. May change activeSubTransaction()
663 as a side effect, if the last query starts a subtransaction.
664
665 The returned pointer is never null, but the list may be empty.
666 */
667
submittedQueries()668 List< Query > * Transaction::submittedQueries()
669 {
670 List<Query> * r = new List<Query>();
671
672 Transaction * t = activeSubTransaction();
673
674 if ( !t->d->queries )
675 return r;
676
677 bool last = false;
678 while ( !t->d->queries->isEmpty() && !last ) {
679 Query * q = t->d->queries->shift();
680 r->append( q );
681 // if we change to a subtransaction, we start picking queries
682 // there, and don't send anything more yet
683 if ( q->transaction() != t ) {
684 t->d->activeChild = q->transaction();
685 last = true;
686 }
687 // if the query is a copy, we have to let it finish before we
688 // can send more queries.
689 if ( q->inputLines() ) {
690 last = true;
691 }
692 }
693
694 return r;
695 }
696