1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2
3 #include "postgres.h"
4
5 #include "dict.h"
6 #include "list.h"
7 #include "estring.h"
8 #include "buffer.h"
9 #include "dbsignal.h"
10 #include "allocator.h"
11 #include "configuration.h"
12 #include "transaction.h"
13 #include "estringlist.h"
14 #include "pgmessage.h"
15 #include "eventloop.h"
16 #include "graph.h"
17 #include "query.h"
18 #include "event.h"
19 #include "scope.h"
20 #include "md5.h"
21 #include "log.h"
22
23 // crypt(), setreuid(), getpwnam()
24 #define _XOPEN_SOURCE 600
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <pwd.h>
28
29
30 static bool hasMessage( Buffer * );
31 static uint serverVersion;
32 static Postgres * listener = 0;
33
34
35 class PgData
36 : public Garbage
37 {
38 public:
PgData()39 PgData()
40 : active( false ), startup( false ), authenticated( false ),
41 unknownMessage( false ), identBreakageSeen( false ),
42 setSessionAuthorisation( false ),
43 sendingCopy( false ), error( false ),
44 keydata( 0 ),
45 description( 0 ), transaction( 0 ),
46 needNotify( 0 ), backendPid( 0 )
47 {}
48
49 bool active;
50 bool startup;
51 bool authenticated;
52 bool unknownMessage;
53 bool identBreakageSeen;
54 bool setSessionAuthorisation;
55 bool sendingCopy;
56 bool error;
57 EStringList listening;
58
59 PgKeyData *keydata;
60 PgRowDescription *description;
61 Dict<Postgres> prepared;
62 EStringList preparesPending;
63
64 List< Query > queries;
65 Transaction *transaction;
66 Query * needNotify;
67
68 EString user;
69
70 uint backendPid;
71
72 class LockSpotter
73 : public EventHandler {
74 public:
LockSpotter(uint p,Transaction * t)75 LockSpotter( uint p, Transaction * t ): EventHandler(), q( 0 ) {
76 setLog( new Log( t->owner()->log() ) );
77 Scope x( log() );
78 EString s(
79 "select h.pid::int, a.xact_start::text,"
80 " coalesce(a.client_addr::text,''::text) as client_addr, "
81 " a.current_query::text, "
82 " a.usename::text, "
83 " a.current_query,"
84 " w.locktype::text "
85 "from pg_locks h join pg_locks w using (locktype) "
86 "join pg_stat_activity a on (h.pid="
87 );
88 if (Postgres::version() < 90200)
89 s.append( "a.procpid" );
90 else
91 s.append( "a.pid" );
92 s.append(
93 ") "
94 "where h.granted and not w.granted and w.pid=$1 and "
95 "coalesce(h.relation::text, h.page::text, h.tuple::text, "
96 " h.transactionid::text, h.virtualxid)="
97 "coalesce(w.relation::text, w.page::text, w.tuple::text, "
98 " w.transactionid::text, w.virtualxid)"
99 );
100 q = new Query( s, this );
101 q->bind( 1, p );
102 q->execute();
103 }
execute()104 void execute() {
105 while ( q->hasResults() ) {
106 Row * r = q->nextRow();
107 log( "Transaction is waiting for a lock of type " +
108 r->getEString( "locktype" ).quoted() + ". "
109 "The following points to the lock's current holder. "
110 "PID: " + fn( r->getInt( "pid" ) ) + " "
111 "Transaction start time: " + r->getEString( "xact_start" ) + " "
112 "Username: " + r->getEString( "usename" ) + " "
113 "Client address: " + r->getEString( "client_addr" ) + " "
114 "Current query: " + r->getEString( "current_query" ).quoted(),
115 Log::Significant );
116 }
117 }
118 Query * q;
119 };
120 };
121
122
123 /*! \class Postgres postgres.h
124 Implements the PostgreSQL 3.0 Frontend-Backend protocol.
125
126 This is our interface to PostgreSQL. As a subclass of Database, it
127 accepts Query objects, sends queries to the database, and notifies
128 callers about any resulting data. As a descendant of Connection, it
129 is responsible for all network communications with the server.
130
131 The network protocol is documented at <doc/src/sgml/protocol.sgml>
132 and <http://www.postgresql.org/docs/current/static/protocol.html>.
133 The version implemented here is used by PostgreSQL 7.4 and later.
134
135 At the time of writing, there do not seem to be any other suitable
136 PostgreSQL client libraries available. For example, libpqxx doesn't
137 support asynchronous operation or prepared statements. Its interface
138 would be difficult to wrap in a database-agnostic manner, and it
139 depends on the untested libpq. The others aren't much better.
140 */
141
142 /*! Creates a Postgres object, initiates a TCP connection to the server,
143 registers with the main loop, and adds this Database to the list of
144 available handles.
145 */
146
Postgres()147 Postgres::Postgres()
148 : Database(), d( new PgData )
149 {
150 d->user = Database::user();
151 struct passwd * p = getpwnam( d->user.cstr() );
152 if ( p && getuid() != p->pw_uid ) {
153 // Try to cooperate with ident authentication.
154 uid_t e = geteuid();
155 setreuid( 0, p->pw_uid );
156 connect( address(), port() );
157 setreuid( 0, e );
158 }
159 else {
160 connect( address(), port() );
161 }
162
163 log( "Connecting to PostgreSQL server at " +
164 address() + ":" + fn( port() ) + " "
165 "(backend " + fn( connectionNumber() ) + ", fd " + fn( fd() ) +
166 ", user " + d->user + ")", Log::Debug );
167
168 if ( Connection::state() != Invalid ) {
169 setTimeoutAfter( 10 );
170 EventLoop::global()->addConnection( this );
171 }
172 }
173
174
~Postgres()175 Postgres::~Postgres()
176 {
177 EventLoop::global()->removeConnection( this );
178 }
179
180
processQueue()181 void Postgres::processQueue()
182 {
183 if ( !d->queries.isEmpty() )
184 return;
185
186 if ( d->sendingCopy )
187 return;
188
189 if ( d->transaction &&
190 ( d->transaction->state() == Transaction::Completed ||
191 d->transaction->state() == Transaction::RolledBack ) )
192 d->transaction = 0;
193
194 if ( !::listener && !d->transaction )
195 ::listener = this;
196 if ( ::listener == this )
197 sendListen();
198
199 List< Query > * l = 0;
200 if ( d->transaction ) {
201 l = d->transaction->submittedQueries();
202 }
203 else {
204 if ( listener == this && numHandles() > 1 )
205 l = Database::firstSubmittedQuery( false );
206 else
207 l = Database::firstSubmittedQuery( true );
208
209 if ( l->firstElement() && l->firstElement()->transaction() ) {
210 Transaction * t = l->firstElement()->transaction();
211 d->transaction = t;
212 t->setDatabase( this );
213 }
214 }
215
216 Query * q = l->shift();
217 while ( q ) {
218 q->setState( Query::Executing );
219 if ( !d->error ) {
220 processQuery( q );
221 }
222 else {
223 q->setError( "Database handle no longer usable." );
224 q->notify();
225 }
226 q = l->shift();
227 }
228
229 if ( d->queries.isEmpty() )
230 reactToIdleness();
231 }
232
233
234 /*! Sends whatever messages are required to make the backend process the
235 query \a q.
236 */
237
processQuery(Query * q)238 void Postgres::processQuery( Query * q )
239 {
240 Scope x( q->log() );
241 d->queries.append( q );
242 EString s( "Sent " );
243 if ( q->name() == "" ||
244 !d->prepared.contains( q->name() ) )
245 {
246 PgParse a( queryString( q ), q->name() );
247 a.enqueue( writeBuffer() );
248
249 if ( q->name() != "" ) {
250 d->prepared.insert( q->name(), this );
251 d->preparesPending.append( q->name() );
252 }
253
254 s.append( "parse/" );
255 }
256
257 PgBind b( q->name() );
258 b.bind( q->values() );
259 b.enqueue( writeBuffer() );
260
261 PgDescribe c;
262 c.enqueue( writeBuffer() );
263
264 PgExecute ex;
265 ex.enqueue( writeBuffer() );
266
267 PgSync e;
268 e.enqueue( writeBuffer() );
269
270 s.append( "execute for " );
271 s.append( q->description() );
272 s.append( " on backend " );
273 s.appendNumber( connectionNumber() );
274 ::log( s, Log::Debug );
275 recordExecution();
276 }
277
278
react(Event e)279 void Postgres::react( Event e )
280 {
281 switch ( e ) {
282 case Connect:
283 {
284 PgStartup msg;
285 msg.setOption( "user", d->user );
286 msg.setOption( "database", name() );
287 msg.setOption( "search_path",
288 Configuration::text( Configuration::DbSchema ) );
289 msg.enqueue( writeBuffer() );
290
291 d->active = true;
292 d->startup = true;
293 }
294 break;
295
296 case Read:
297 while ( d->active && hasMessage( readBuffer() ) ) {
298 /* We call a function to process every message we receive.
299 This function is expected to parse and remove a message
300 from the readBuffer, throwing an exception for malformed
301 messages, and setting d->unknownMessage for messages that
302 it can't or won't handle. */
303
304 char msg = (*readBuffer())[0];
305 try {
306 if ( d->startup ) {
307 if ( !d->authenticated )
308 authentication( msg );
309 else
310 backendStartup( msg );
311 }
312 else {
313 process( msg );
314 }
315
316 if ( d->unknownMessage )
317 unknown( msg );
318 }
319 catch ( PgServerMessage::Error e ) {
320 error( "Malformed " + EString( &msg, 1 ).quoted() +
321 " message received." );
322 }
323 }
324 if ( d->needNotify )
325 d->needNotify->notify();
326 d->needNotify = 0;
327
328 break;
329
330 case Error:
331 error( "Couldn't connect to PostgreSQL." );
332 break;
333
334 case Close:
335 if ( d->active )
336 error( "Connection terminated by the server." );
337 break;
338
339 case Timeout:
340 if ( d->transaction &&
341 ( d->transaction->state() == Transaction::Completed ||
342 d->transaction->state() == Transaction::RolledBack ) )
343 d->transaction = 0;
344
345 if ( !d->active || d->startup ) {
346 error( "Timeout negotiating connection to PostgreSQL." );
347 }
348 else if ( d->transaction ) {
349 if ( d->queries.isEmpty() )
350 d->transaction->rollback();
351 else if ( d->backendPid )
352 new PgData::LockSpotter( d->backendPid, d->transaction );
353 else
354 log( "Transaction unexpectedly slow; continuing " );
355 }
356 else if ( d->queries.isEmpty() &&
357 ::listener != this &&
358 server().protocol() != Endpoint::Unix &&
359 handlesNeeded() < numHandles() ) {
360 log( "Closing idle database backend " + fn( connectionNumber() ) +
361 " (" + fn( numHandles()-1 ) + " remaining)" );
362 shutdown();
363 }
364 break;
365
366 case Shutdown:
367 shutdown();
368 break;
369 }
370
371 if ( usable() ) {
372 processQueue();
373 if ( d->queries.isEmpty() && !d->transaction ) {
374 uint interval =
375 Configuration::scalar( Configuration::DbHandleInterval );
376 if ( ::listener == this )
377 interval = interval * 2;
378 else if ( idleHandles() > 2 && interval > 20 )
379 interval = 20;
380 setTimeoutAfter( interval );
381 }
382 }
383 if ( d->transaction && d->queries.isEmpty() ) {
384 // if the transaction doesn't do anything, just sits there
385 // holding its handle, we have to rollback() in order to free
386 // the handle for better work. we do it more quickly for a
387 // broken transaction than for one that seems fine.
388 if ( d->transaction->state() == Transaction::Failed )
389 setTimeoutAfter( 5 );
390 else
391 setTimeoutAfter( 20 );
392 }
393 }
394
395
396 /*! This function handles the authentication phase of the protocol. It
397 expects and responds to an authentication request, and waits for a
398 positive response before entering the backend startup phase. It is
399 called by react with the \a type of the message to process.
400 */
401
authentication(char type)402 void Postgres::authentication( char type )
403 {
404 switch ( type ) {
405 case 'R':
406 {
407 PgAuthRequest r( readBuffer() );
408
409 switch ( r.type() ) {
410 case PgAuthRequest::Success:
411 d->authenticated = true;
412 break;
413
414 case PgAuthRequest::Password:
415 case PgAuthRequest::Crypt:
416 case PgAuthRequest::MD5:
417 {
418 EString pass = password();
419
420 if ( d->setSessionAuthorisation ) {
421 error( "Cannot supply credentials during proxy "
422 "authentication" );
423 return;
424 }
425
426 if ( r.type() == PgAuthRequest::Crypt )
427 pass = ::crypt( pass.cstr(), r.salt().cstr() );
428 else if ( r.type() == PgAuthRequest::MD5 )
429 pass = "md5" + MD5::hash(
430 MD5::hash(
431 pass + d->user
432 ).hex() + r.salt()
433 ).hex();
434
435 PgPasswordMessage p( pass );
436 p.enqueue( writeBuffer() );
437 }
438 break;
439
440 default:
441 error( "Unsupported PgAuthRequest." );
442 break;
443 }
444 }
445 break;
446
447 default:
448 d->unknownMessage = true;
449 break;
450 }
451 }
452
453
454 /*! This function negotiates the backend startup phase of the protocol
455 (storing any messages the server sends us), concluding the startup
456 process when the server indicates that it is ready for queries. It
457 is called by react() with the \a type of the message to process.
458 */
459
backendStartup(char type)460 void Postgres::backendStartup( char type )
461 {
462 switch ( type ) {
463 case 'Z':
464 setTimeout( 0 );
465 d->startup = false;
466 if ( CitextLookup::necessary() )
467 processQuery( (new CitextLookup())->q );
468 addHandle( this );
469
470 // This successfully concludes connection startup. We'll leave
471 // this message unparsed, so that process() can handle it like
472 // any other PgReady.
473
474 if ( d->setSessionAuthorisation )
475 processQuery( new Query( "SET SESSION AUTHORIZATION " +
476 Database::user(), 0 ) );
477
478 break;
479
480 case 'K':
481 d->keydata = new PgKeyData( readBuffer() );
482 log( "Postgres backend " + fn( connectionNumber() ) +
483 " has pid " + fn( d->keydata->pid() ), Log::Debug );
484 d->backendPid = d->keydata->pid();
485 break;
486
487 default:
488 d->unknownMessage = true;
489 break;
490 }
491 }
492
493
494 /*! This function handles interaction with the server once the startup
495 phase is complete. It is called by react() with the \a type of the
496 message to process.
497 */
498
process(char type)499 void Postgres::process( char type )
500 {
501 Query * q = d->queries.firstElement();
502 Scope x;
503 if ( q && q->log() )
504 x.setLog( q->log() );
505
506 extendTimeout( 5 );
507
508 switch ( type ) {
509 case '1':
510 {
511 PgParseComplete msg( readBuffer() );
512 if ( q && q->name() != "" )
513 d->preparesPending.shift();
514 }
515 break;
516
517 case '2':
518 {
519 PgBindComplete msg( readBuffer() );
520 }
521 break;
522
523 case 'n':
524 {
525 PgNoData msg( readBuffer() );
526 }
527 break;
528
529 case 't':
530 (void)new PgParameterDescription( readBuffer() );
531 break;
532
533 case 'G':
534 {
535 PgCopyInResponse msg( readBuffer() );
536 if ( q && q->inputLines() ) {
537 log( "Sending " + fn( q->inputLines()->count() ) +
538 " data rows",
539 Log::Debug );
540 PgCopyData cd( q );
541 PgCopyDone e;
542
543 cd.enqueue( writeBuffer() );
544 e.enqueue( writeBuffer() );
545 }
546 else {
547 PgCopyFail f;
548 f.enqueue( writeBuffer() );
549 }
550
551 PgSync s;
552 s.enqueue( writeBuffer() );
553 d->sendingCopy = false;
554 }
555 break;
556
557 case 'T':
558 d->description = new PgRowDescription( readBuffer() );
559 break;
560
561 case 'D':
562 {
563 if ( !q || !d->description ) {
564 error( "Unexpected data row" );
565 return;
566 }
567
568 PgDataRow msg( readBuffer(), d->description );
569 q->addRow( msg.row() );
570 if ( d->needNotify && d->needNotify != q )
571 d->needNotify->notify();
572 d->needNotify = q;
573 }
574 break;
575
576 case 'I':
577 case 'C':
578 {
579 PgCommandComplete * cc = 0;
580 if ( type == 'C' )
581 cc = new PgCommandComplete( readBuffer() );
582 else
583 PgEmptyQueryResponse msg( readBuffer() );
584
585 if ( q ) {
586 EString s;
587 s.append( "Dequeueing query " );
588 s.append( q->description() );
589 s.append( " on backend " );
590 s.appendNumber( connectionNumber() );
591 EString command;
592 if ( cc )
593 command = cc->tag().section( " ", 1 );
594 if ( cc && !q->rows() ) {
595 uint an = 2;
596 if ( command == "INSERT" )
597 an = 3;
598 q->setRows( cc->tag().section( " ", an ).number( 0 ) );
599 }
600 if ( q->rows() ||
601 command == "SELECT" || command == "FETCH" ||
602 command == "INSERT" || command == "UPDATE" ) {
603 s.append( " (with " );
604 s.appendNumber( q->rows() );
605 s.append( " rows)" );
606 }
607 ::log( s, Log::Info );
608 if ( !q->done() ) {
609 q->setState( Query::Completed );
610 countQueries( q );
611 }
612 d->queries.shift();
613 q->notify();
614 d->needNotify = 0;
615 }
616 }
617 break;
618
619 case 'Z':
620 {
621 PgReady msg( readBuffer() );
622 setState( msg.state() );
623 }
624 break;
625
626 case 'A':
627 {
628 PgNotificationResponse msg( readBuffer() );
629 EString s;
630 if ( !msg.source().isEmpty() )
631 s = " (" + msg.source() + ")";
632 log( "Received notify " + msg.name().quoted() +
633 " from server pid " + fn( msg.pid() ) + s, Log::Debug );
634 DatabaseSignal::notifyAll( msg.name() );
635 }
636 break;
637
638 default:
639 d->unknownMessage = true;
640 break;
641 }
642 }
643
644
645 /*! This function handles unknown or unwanted messages that some other
646 function declined to process (by setting d->unknownMessage). It is
647 called by react() with the \a type of the unknown message.
648 */
649
unknown(char type)650 void Postgres::unknown( char type )
651 {
652 switch ( type ) {
653 case 'S':
654 {
655 d->unknownMessage = false;
656 PgParameterStatus msg( readBuffer() );
657
658 EString n = msg.name();
659 EString v = msg.value();
660 EString e;
661 bool known = true;
662 if ( n == "client_encoding" ) {
663 if ( v != "UTF8" && v != "SQL_ASCII" )
664 e = "Unexpected client encoding: ";
665 }
666 else if ( n == "DateStyle" ) {
667 // we want ISO on the list somewhere
668 if ( !v.containsWord( "ISO" ) )
669 e = "DateStyle apparently does not support ISO: ";
670 }
671 else if ( n == "integer_datetimes" ) {
672 // PG documentation says:
673 // "Use 64-bit integer storage for datetimes and
674 // intervals, rather than the default floating-point
675 // storage. This reduces the range of representable
676 // values but guarantees microsecond precision across
677 // the full range (see Section 8.5 for more
678 // information)."
679 // We don't care about that. Email uses only seconds,
680 // and only a fairly limited time range. Both on and
681 // off are okay.
682 }
683 else if ( n == "is_superuser" ) {
684 if ( v.simplified().lower() != "off" )
685 e = "Connected as superuser: ";
686 }
687 else if ( n == "server_encoding" ) {
688 if ( v != "UTF8" && v != "SQL_ASCII" )
689 e = "Unexpected server encoding: ";
690 }
691 else if ( n == "server_version" ) {
692 bool ok = true;
693 serverVersion = 10000 * v.section( ".", 1 ).number( &ok ) +
694 100 * v.section( ".", 2 ).number( &ok ) +
695 v.section( ".", 3 ).number( &ok );
696 if ( !ok || version() < 90100 )
697 e = "Archiveopteryx requires PostgreSQL 9.1 or higher: ";
698 }
699 else if ( n == "session_authorization" ) {
700 // we could test that v is d->user, but I don't think
701 // we care. besides it might sound an alarm about our
702 // ident workarounds.
703 }
704 else if ( n == "standard_conforming_strings" ) {
705 // hm... ?
706 }
707 else if ( n == "TimeZone" ) {
708 // we don't care.
709 }
710 else {
711 known = false;
712 }
713 if ( known && e.isEmpty() ) {
714 // we're entirely silent about this. all is well.
715 }
716 else {
717 EString s( "PostgreSQL server: " );
718 if ( e.isEmpty() )
719 s.append( "SET " );
720 else
721 s.append( e );
722 s.append( n );
723 s.append( "=" );
724 s.append( v.quoted() );
725 if ( e.isEmpty() )
726 ::log( s, Log::Debug );
727 else
728 ::log( s );
729 }
730 }
731 break;
732
733 case 'N':
734 case 'E':
735 d->unknownMessage = false;
736 serverMessage();
737 break;
738
739 default:
740 {
741 EString err = "Unexpected message (";
742
743 if ( type > 32 && type < 127 )
744 err.append( type );
745 else
746 err.append( "%" + fn( (int)type, 16 ) );
747
748 err.append( ") received" );
749 if ( d->startup ) {
750 if ( !d->authenticated )
751 err.append( " during authentication" );
752 else
753 err.append( " during backend startup" );
754 }
755 err.append( "." );
756 error( err );
757 }
758 break;
759 }
760 }
761
762
763 /*! This function handles errors and other messages from the server.
764
765 Uses the sqlstates specified
766 http://www.postgresql.org/docs/current/static/protocol.html
767 extensively.
768 */
769
serverMessage()770 void Postgres::serverMessage()
771 {
772 Scope x;
773 EString s;
774 PgMessage msg( readBuffer() );
775 Query *q = d->queries.firstElement();
776 EString m( msg.message() );
777 EString code = msg.code();
778 Endpoint server( peer() );
779
780 if ( code == "57P03" ) {
781 log( "Retrying connection after delay because PostgreSQL "
782 "is still starting up.", Log::Info );
783 close();
784 sleep( 1 );
785 connect( server );
786 }
787 else if ( code == "57P01" || code == "57P02" ) {
788 if ( code == "57P01" )
789 log( "PostgreSQL is shutting down; closing connection.", Log::Info );
790 else
791 log( "PostgreSQL reports a crash; closing connection.", Log::Info );
792 removeHandle( this );
793 if ( ::listener == this )
794 ::listener = 0;
795 close();
796 error( "PostgreSQL server shut down" );
797 }
798 else if ( code == "28000" && m.lower().containsWord( "ident" ) ) {
799 int b = m.find( '"' );
800 int e = m.find( '"', b+1 );
801 EString user( m.mid( b+1, e-b-1 ) );
802
803 struct passwd * u = getpwnam( d->user.cstr() );
804
805 struct passwd * p = 0;
806 const char * pg
807 = Configuration::compiledIn( Configuration::PgUser );
808
809 if ( pg )
810 p = getpwnam( pg );
811 if ( !p )
812 p = getpwnam( "postgres" );
813 if ( !p )
814 p = getpwnam( "pgsql" );
815
816 if ( !d->identBreakageSeen && loginAs() == DbOwner &&
817 u == 0 && p != 0 )
818 {
819 d->identBreakageSeen = true;
820 d->setSessionAuthorisation = true;
821 log( "Attempting to authenticate as superuser to use "
822 "SET SESSION AUTHORIZATION", Log::Info );
823 d->user = EString( p->pw_name );
824 uid_t e = geteuid();
825 setreuid( 0, p->pw_uid );
826 close();
827 connect( server );
828 setreuid( 0, e );
829 }
830 else if ( s == Configuration::text(Configuration::JailUser) &&
831 Configuration::toggle( Configuration::Security ) &&
832 self().protocol() != Endpoint::Unix )
833 {
834 // If we connected via IPv4 or IPv6, early enough that
835 // postgres had a chance to reject us, we'll try again.
836 d->identBreakageSeen = true;
837 log( "PostgreSQL demanded IDENT, which did not match "
838 "during startup. Retrying.", Log::Info );
839 close();
840 connect( server );
841 }
842 else {
843 log( "PostgreSQL refuses authentication because this "
844 "process is not running as user " + user.quoted() +
845 ". See http://aox.org/faq/mailstore#ident",
846 Log::Disaster );
847 }
848 }
849 else if ( code == "28000" ) {
850 log( "Cannot authenticate as PostgreSQL user " + d->user.quoted() +
851 ". Server message: " + msg.message(), Log::Disaster );
852 }
853 else if ( code.startsWith( "53" ) ) {
854 uint m = Configuration::scalar( Configuration::DbMaxHandles );
855 if ( code == "53000" )
856 log( "PostgreSQL server reports too many client connections. "
857 "Our connection count is " + fn( numHandles() ) + ", "
858 "configured maximum is " + fn( m ) + ".",
859 Log::Error );
860 else
861 log( "PostgreSQL server has a resource problem (" + code + "): " +
862 msg.message(),
863 Log::Significant );
864 if ( m > 2 ) {
865 log( "Setting db-max-handles to 2 (was " + fn( m ) + ")" );
866 Configuration::add( "db-max-handles = 2" );
867 }
868
869 }
870 else if ( msg.type() == PgMessage::Notification ) {
871 s.append( "PostgreSQL server: " );
872 if ( q ) {
873 s.append( "Query " + q->description() + ": " );
874 x.setLog( q->log() );
875 }
876 s.append( m );
877 if ( !code.startsWith( "00" ) )
878 s.append( " (warning)" );
879 ::log( s, Log::Debug );
880 }
881 else if ( q && !code.startsWith( "00" ) ) {
882 s.append( "PostgreSQL server: " );
883 s.append( "Query " + q->description() + " failed: " );
884 x.setLog( q->log() );
885 s.append( m );
886 if ( !msg.detail().isEmpty() )
887 s.append( " (" + msg.detail() + ")" );
888 s.append( " (" + code + ")" );
889
890 // If we sent a Parse message for a named prepared statement
891 // while processing this query, but don't already know that
892 // it succeeded, we'll assume that statement name does not
893 // exist for future use.
894 EString * pp = d->preparesPending.first();
895 if ( q->name() != "" && pp && *pp == q->name() ) {
896 d->prepared.remove( q->name() );
897 d->preparesPending.shift();
898 }
899 if ( q->inputLines() )
900 d->sendingCopy = false;
901 d->queries.shift();
902 m = mapped( m );
903 if ( !msg.detail().isEmpty() )
904 s.append( " (" + msg.detail() + ")" );
905 q->setError( m );
906 q->notify();
907 }
908 else {
909 ::log( "PostgreSQL server message could not be interpreted."
910 " Message: " + msg.message() +
911 " SQL state code: " + code +
912 " Severity: " + msg.severity().lower(),
913 Log::Error );
914 }
915
916 if ( code.startsWith( "08" ) ) // connection exception
917 error( "PostgreSQL server error: " + s );
918 }
919
920
921 // these errors are based on a selection of the results from
922 // select indexname from pg_indexes where tablename in
923 // (select tablename from pg_tables where tableowner='aoxsuper')
924
925 static const struct {
926 const char * constraint;
927 const char * human;
928 } errormap[] = {
929 // some index names
930 {"addresses_nld_key",
931 "Operation would create two identical addresses" },
932 {"u_l",
933 "Operation wold create two users with identical login names"},
934 // some constraints from our postgresql schema
935 {"aliases_address_fkey", // contype f
936 "Operation would create two aliases with the same address"},
937 {"aliases_address_key", // contype u
938 "Operation would create two aliases with the same address"},
939 {"annotation_names_name_key", // contype u
940 "Operation would create two annotation_names rows with the same_name"},
941 {"annotations_mailbox_key", // contype u
942 "Operation would create a duplicate annotations row"},
943 {"annotations_mailbox_key1", // contype u
944 "Operation would create a duplicate annotations row"},
945 // XXX where does the annotations unique condition end up?
946 {"deliveries_message_key", // contype u
947 "Operation would store the same message for delivery twice"},
948 {"field_names_name_key", // contype u
949 "Operation would create two header field names with the same name"},
950 {"fn_uname",
951 "Operation would store two identical flag names separately"},
952 {"group_members_groupname_fkey", // contype f
953 "Operation would create group_members row with invalid groupname"},
954 {"group_members_member_fkey", // contype f
955 "Operation would create group_members row with invalid member"},
956 {"group_members_pkey", // contype p
957 "Operation would create duplicate group_members row"},
958 // XXX shouldb't groups.name be unique? and different from all users.name?
959 {"mailboxes_name_key", // contype u
960 "Operation would create two mailboxes with the same name"},
961 {"mailboxes_owner_fkey", // contype f
962 "Operation would create a mailbox without an owner"},
963 {"messages_id_key", // contype u
964 "Opeation would create two messages objects with the same ID"},
965 {"namespaces_name_key", // contype u
966 "Operation would create two user namespaces with the same name"},
967 {"permissions_mailbox_fkey", // contype f
968 "Operation would create a permissions row without a mailbox"},
969 {"permissions_pkey", // contype p
970 "Operation would create a duplicate permissions row"},
971 {"scripts_owner_key", // contype u
972 "Operation would store two scripts with the same owner and name"},
973 // XXX shouldn't users.alias be unique?
974 {"users_alias_fkey", // contype f
975 "users_alias"},
976 {"users_parentspace_fkey", // contype f
977 "Operation would create a users row without a namespace"},
978 {0,0}
979 };
980
981
982
983 /*! Looks for constraint names in \a s and returns an error message
984 corresponding to the relevant constraint. Returns \a s if it finds
985 none.
986 */
987
mapped(const EString & s) const988 EString Postgres::mapped( const EString & s ) const
989 {
990 if ( !s.contains( "_" ) )
991 return "PostgreSQL Server: " + s;
992
993 EString h;
994 uint maps = 0;
995 EString w;
996 uint i = 0;
997 while ( maps < 2 && i <= s.length() ) {
998 char c = s[i];
999 if ( ( c >= 'a' && c <= 'z' ) ||
1000 ( c >= 'A' && c <= 'Z' ) ||
1001 ( c >= '0' && c <= '9' ) ||
1002 ( c == '_' ) ) {
1003 w.append( c );
1004 }
1005 else if ( !w.isEmpty() ) {
1006 uint j = 0;
1007 while ( errormap[j].constraint && w != errormap[j].constraint )
1008 j++;
1009 if ( errormap[j].constraint ) {
1010 maps++;
1011 h = errormap[j].human;
1012 h.append( " (" );
1013 h.append( w );
1014 h.append( ")" );
1015 }
1016 w.truncate();
1017 }
1018 i++;
1019 }
1020 if ( maps != 1 )
1021 return "PostgreSQL Server: " + s;
1022
1023 return h;
1024 }
1025
1026
1027
1028 /*! Handles all protocol/socket errors by logging the error message \a s
1029 and closing the connection after emptying the write buffer and
1030 notifying any pending queries of the failure.
1031
1032 */
1033
error(const EString & s)1034 void Postgres::error( const EString &s )
1035 {
1036 if ( ::listener == this )
1037 ::listener = 0;
1038
1039 Scope x( log() );
1040 ::log( s + " (on backend " + fn( connectionNumber() ) + ")", Log::Error );
1041
1042 d->error = true;
1043 d->active = false;
1044 setState( Broken );
1045
1046 List< Query >::Iterator q( d->queries );
1047 while ( q ) {
1048 q->setError( s );
1049 q->notify();
1050 ++q;
1051 }
1052
1053 removeHandle( this );
1054
1055 writeBuffer()->remove( writeBuffer()->size() );
1056 Connection::setState( Closing );
1057 }
1058
1059
1060 /*! Sends a termination message and takes this database handle out of
1061 circulation gracefully.
1062 */
1063
shutdown()1064 void Postgres::shutdown()
1065 {
1066 if ( ::listener == this )
1067 ::listener = 0;
1068
1069 PgTerminate msg;
1070 msg.enqueue( writeBuffer() );
1071
1072 if ( d->transaction ) {
1073 d->transaction->setError( 0, "Database connection shutdown" );
1074 d->transaction->notify();
1075 }
1076 List< Query >::Iterator q( d->queries );
1077 while ( q ) {
1078 if ( !q->done() ) {
1079 q->setError( "Database connection shutdown" );
1080 q->notify();
1081 }
1082 ++q;
1083 }
1084
1085 removeHandle( this );
1086 d->active = false;
1087 }
1088
1089
hasMessage(Buffer * b)1090 static bool hasMessage( Buffer *b )
1091 {
1092 if ( b->size() < 5 ||
1093 b->size() < 1+( (uint)((*b)[1]<<24)|((*b)[2]<<16)|
1094 ((*b)[3]<<8)|((*b)[4]) ) )
1095 return false;
1096 return true;
1097 }
1098
1099
1100 /*! Returns true if this handle is willing to process new queries: i.e.
1101 if it has an active and error-free connection to the server, and no
1102 outstanding queries; and false otherwise.
1103 */
1104
usable() const1105 bool Postgres::usable() const
1106 {
1107 return ( d->active && !d->startup &&
1108 !( state() == Connecting || state() == Broken ) &&
1109 d->queries.isEmpty() );
1110 }
1111
1112
1113 static GraphableCounter * goodQueries = 0;
1114 static GraphableCounter * badQueries = 0;
1115
1116
1117 /*! Updates the statistics when \a q is done. */
1118
countQueries(class Query * q)1119 void Postgres::countQueries( class Query * q )
1120 {
1121 if ( !goodQueries ) {
1122 goodQueries = new GraphableCounter( "queries-executed" ); // bad name?
1123 badQueries = new GraphableCounter( "queries-failed" ); // bad name?
1124 }
1125
1126 if ( !q->failed() )
1127 goodQueries->tick();
1128 else if ( !q->canFail() )
1129 badQueries->tick();
1130 ; // a query which fails but canFail is not counted anywhere.
1131
1132 // later also use GraphableDataSet to keep track of query
1133 // execution times, but not right now.
1134 }
1135
1136
1137 /*! Returns the Postgres server's declared version number as an
1138 integer. 8.1.0 is returned as 80100, 8.3.2 as 80302.
1139
1140 The version number is learned immediately after connecting.
1141 version() returns 0 until the first Postgres instance learns the
1142 server version.
1143 */
1144
version()1145 uint Postgres::version()
1146 {
1147 return ::serverVersion;
1148 }
1149
1150
1151 /*! Makes sure Postgres sends as many LISTEN commands as necessary,
1152 see DatabaseSignal and
1153 http://www.postgresql.org/docs/8.1/static/sql-listen.html
1154
1155 */
1156
sendListen()1157 void Postgres::sendListen()
1158 {
1159 EStringList::Iterator s( DatabaseSignal::names() );
1160 while ( s ) {
1161 EString name = *s;
1162 ++s;
1163 if ( !d->listening.contains( name ) ) {
1164 d->listening.append( name );
1165 if ( !name.boring() )
1166 name = name.quoted();
1167 processQuery( new Query( "listen " + name, 0 ) );
1168 }
1169 }
1170 }
1171
1172
1173 /*! Returns the query string for \a q, after possibly applying
1174 version-specific hacks and workarounds. */
1175
queryString(Query * q)1176 EString Postgres::queryString( Query * q )
1177 {
1178 if ( !q->name().isEmpty() )
1179 return q->string();
1180
1181 EString s( q->string() );
1182
1183 if ( s != q->string() ) {
1184 Scope x( q->log() );
1185 log( "Changing query string to: " + s, Log::Debug );
1186 }
1187
1188 return s;
1189 }
1190
1191
1192 class PgCanceller
1193 : public Postgres
1194 {
1195 private:
1196 PgKeyData * k;
1197
1198 public:
PgCanceller(PgKeyData * key)1199 PgCanceller( PgKeyData * key )
1200 : Postgres(), k( key )
1201 {
1202 log( "Sending cancel for pid " + fn( k->pid() ), Log::Debug );
1203 }
1204
react(Event e)1205 void react( Event e )
1206 {
1207 switch (e) {
1208 case Connect:
1209 {
1210 PgCancel msg( k );
1211 msg.enqueue( writeBuffer() );
1212 Connection::setState( Closing );
1213 }
1214 break;
1215
1216 default:
1217 break;
1218 }
1219 }
1220 };
1221
1222
1223 /*! Issues a cancel request for the query \a q if it is being executed
1224 by this Postgres object. If not, it does nothing.
1225 */
1226
cancel(Query * q)1227 void Postgres::cancel( Query * q )
1228 {
1229 if ( d->queries.find( q ) )
1230 (void)new PgCanceller( d->keydata );
1231 }
1232