1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2 
3 #include "postgres.h"
4 
5 #include "dict.h"
6 #include "list.h"
7 #include "estring.h"
8 #include "buffer.h"
9 #include "dbsignal.h"
10 #include "allocator.h"
11 #include "configuration.h"
12 #include "transaction.h"
13 #include "estringlist.h"
14 #include "pgmessage.h"
15 #include "eventloop.h"
16 #include "graph.h"
17 #include "query.h"
18 #include "event.h"
19 #include "scope.h"
20 #include "md5.h"
21 #include "log.h"
22 
23 // crypt(), setreuid(), getpwnam()
24 #define _XOPEN_SOURCE 600
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <pwd.h>
28 
29 
30 static bool hasMessage( Buffer * );
31 static uint serverVersion;
32 static Postgres * listener = 0;
33 
34 
35 class PgData
36     : public Garbage
37 {
38 public:
PgData()39     PgData()
40         : active( false ), startup( false ), authenticated( false ),
41           unknownMessage( false ), identBreakageSeen( false ),
42           setSessionAuthorisation( false ),
43           sendingCopy( false ), error( false ),
44           keydata( 0 ),
45           description( 0 ), transaction( 0 ),
46           needNotify( 0 ), backendPid( 0 )
47         {}
48 
49     bool active;
50     bool startup;
51     bool authenticated;
52     bool unknownMessage;
53     bool identBreakageSeen;
54     bool setSessionAuthorisation;
55     bool sendingCopy;
56     bool error;
57     EStringList listening;
58 
59     PgKeyData *keydata;
60     PgRowDescription *description;
61     Dict<Postgres> prepared;
62     EStringList preparesPending;
63 
64     List< Query > queries;
65     Transaction *transaction;
66     Query * needNotify;
67 
68     EString user;
69 
70     uint backendPid;
71 
72     class LockSpotter
73         : public EventHandler {
74     public:
LockSpotter(uint p,Transaction * t)75         LockSpotter( uint p, Transaction * t ): EventHandler(), q( 0 ) {
76             setLog( new Log( t->owner()->log() ) );
77             Scope x( log() );
78             EString s(
79                 "select h.pid::int, a.xact_start::text,"
80                 " coalesce(a.client_addr::text,''::text) as client_addr, "
81                 " a.current_query::text, "
82                 " a.usename::text, "
83                 " a.current_query,"
84                 " w.locktype::text "
85                 "from pg_locks h join pg_locks w using (locktype) "
86                 "join pg_stat_activity a on (h.pid="
87             );
88             if (Postgres::version() < 90200)
89                 s.append( "a.procpid" );
90             else
91                 s.append( "a.pid" );
92             s.append(
93                 ") "
94                 "where h.granted and not w.granted and w.pid=$1 and "
95                 "coalesce(h.relation::text, h.page::text, h.tuple::text, "
96                 " h.transactionid::text, h.virtualxid)="
97                 "coalesce(w.relation::text, w.page::text, w.tuple::text, "
98                 " w.transactionid::text, w.virtualxid)"
99             );
100             q = new Query( s, this );
101             q->bind( 1, p );
102             q->execute();
103         }
execute()104         void execute() {
105             while ( q->hasResults() ) {
106                 Row * r = q->nextRow();
107                 log( "Transaction is waiting for a lock of type " +
108                      r->getEString( "locktype" ).quoted() + ". "
109                      "The following points to the lock's current holder. "
110                      "PID: " + fn( r->getInt( "pid" ) ) + " "
111                      "Transaction start time: " + r->getEString( "xact_start" ) + " "
112                      "Username: " + r->getEString( "usename" ) + " "
113                      "Client address: " + r->getEString( "client_addr" ) + " "
114                      "Current query: " + r->getEString( "current_query" ).quoted(),
115                      Log::Significant );
116             }
117         }
118         Query * q;
119     };
120 };
121 
122 
123 /*! \class Postgres postgres.h
124     Implements the PostgreSQL 3.0 Frontend-Backend protocol.
125 
126     This is our interface to PostgreSQL. As a subclass of Database, it
127     accepts Query objects, sends queries to the database, and notifies
128     callers about any resulting data. As a descendant of Connection, it
129     is responsible for all network communications with the server.
130 
131     The network protocol is documented at <doc/src/sgml/protocol.sgml>
132     and <http://www.postgresql.org/docs/current/static/protocol.html>.
133     The version implemented here is used by PostgreSQL 7.4 and later.
134 
135     At the time of writing, there do not seem to be any other suitable
136     PostgreSQL client libraries available. For example, libpqxx doesn't
137     support asynchronous operation or prepared statements. Its interface
138     would be difficult to wrap in a database-agnostic manner, and it
139     depends on the untested libpq. The others aren't much better.
140 */
141 
142 /*! Creates a Postgres object, initiates a TCP connection to the server,
143     registers with the main loop, and adds this Database to the list of
144     available handles.
145 */
146 
Postgres()147 Postgres::Postgres()
148     : Database(), d( new PgData )
149 {
150     d->user = Database::user();
151     struct passwd * p = getpwnam( d->user.cstr() );
152     if ( p && getuid() != p->pw_uid ) {
153         // Try to cooperate with ident authentication.
154         uid_t e = geteuid();
155         setreuid( 0, p->pw_uid );
156         connect( address(), port() );
157         setreuid( 0, e );
158     }
159     else {
160         connect( address(), port() );
161     }
162 
163     log( "Connecting to PostgreSQL server at " +
164          address() + ":" + fn( port() ) + " "
165          "(backend " + fn( connectionNumber() ) + ", fd " + fn( fd() ) +
166          ", user " + d->user + ")", Log::Debug );
167 
168     if ( Connection::state() != Invalid ) {
169         setTimeoutAfter( 10 );
170         EventLoop::global()->addConnection( this );
171     }
172 }
173 
174 
~Postgres()175 Postgres::~Postgres()
176 {
177     EventLoop::global()->removeConnection( this );
178 }
179 
180 
processQueue()181 void Postgres::processQueue()
182 {
183     if ( !d->queries.isEmpty() )
184         return;
185 
186     if ( d->sendingCopy )
187         return;
188 
189     if ( d->transaction &&
190          ( d->transaction->state() == Transaction::Completed ||
191            d->transaction->state() == Transaction::RolledBack ) )
192         d->transaction = 0;
193 
194     if ( !::listener && !d->transaction )
195         ::listener = this;
196     if ( ::listener == this )
197         sendListen();
198 
199     List< Query > * l = 0;
200     if ( d->transaction ) {
201         l = d->transaction->submittedQueries();
202     }
203     else {
204         if ( listener == this && numHandles() > 1 )
205             l = Database::firstSubmittedQuery( false );
206         else
207             l = Database::firstSubmittedQuery( true );
208 
209         if ( l->firstElement() && l->firstElement()->transaction() ) {
210             Transaction * t = l->firstElement()->transaction();
211             d->transaction = t;
212             t->setDatabase( this );
213         }
214     }
215 
216     Query * q = l->shift();
217     while ( q ) {
218         q->setState( Query::Executing );
219         if ( !d->error ) {
220             processQuery( q );
221         }
222         else {
223             q->setError( "Database handle no longer usable." );
224             q->notify();
225         }
226         q = l->shift();
227     }
228 
229     if ( d->queries.isEmpty() )
230         reactToIdleness();
231 }
232 
233 
234 /*! Sends whatever messages are required to make the backend process the
235     query \a q.
236 */
237 
processQuery(Query * q)238 void Postgres::processQuery( Query * q )
239 {
240     Scope x( q->log() );
241     d->queries.append( q );
242     EString s( "Sent " );
243     if ( q->name() == "" ||
244          !d->prepared.contains( q->name() ) )
245     {
246         PgParse a( queryString( q ), q->name() );
247         a.enqueue( writeBuffer() );
248 
249         if ( q->name() != "" ) {
250             d->prepared.insert( q->name(), this );
251             d->preparesPending.append( q->name() );
252         }
253 
254         s.append( "parse/" );
255     }
256 
257     PgBind b( q->name() );
258     b.bind( q->values() );
259     b.enqueue( writeBuffer() );
260 
261     PgDescribe c;
262     c.enqueue( writeBuffer() );
263 
264     PgExecute ex;
265     ex.enqueue( writeBuffer() );
266 
267     PgSync e;
268     e.enqueue( writeBuffer() );
269 
270     s.append( "execute for " );
271     s.append( q->description() );
272     s.append( " on backend " );
273     s.appendNumber( connectionNumber() );
274     ::log( s, Log::Debug );
275     recordExecution();
276 }
277 
278 
react(Event e)279 void Postgres::react( Event e )
280 {
281     switch ( e ) {
282     case Connect:
283         {
284             PgStartup msg;
285             msg.setOption( "user", d->user );
286             msg.setOption( "database", name() );
287             msg.setOption( "search_path",
288                            Configuration::text( Configuration::DbSchema ) );
289             msg.enqueue( writeBuffer() );
290 
291             d->active = true;
292             d->startup = true;
293         }
294         break;
295 
296     case Read:
297         while ( d->active && hasMessage( readBuffer() ) ) {
298             /* We call a function to process every message we receive.
299                This function is expected to parse and remove a message
300                from the readBuffer, throwing an exception for malformed
301                messages, and setting d->unknownMessage for messages that
302                it can't or won't handle. */
303 
304             char msg = (*readBuffer())[0];
305             try {
306                 if ( d->startup ) {
307                     if ( !d->authenticated )
308                         authentication( msg );
309                     else
310                         backendStartup( msg );
311                 }
312                 else {
313                     process( msg );
314                 }
315 
316                 if ( d->unknownMessage )
317                     unknown( msg );
318             }
319             catch ( PgServerMessage::Error e ) {
320                 error( "Malformed " + EString( &msg, 1 ).quoted() +
321                        " message received." );
322             }
323         }
324         if ( d->needNotify )
325             d->needNotify->notify();
326         d->needNotify = 0;
327 
328         break;
329 
330     case Error:
331         error( "Couldn't connect to PostgreSQL." );
332         break;
333 
334     case Close:
335         if ( d->active )
336             error( "Connection terminated by the server." );
337         break;
338 
339     case Timeout:
340         if ( d->transaction &&
341              ( d->transaction->state() == Transaction::Completed ||
342                d->transaction->state() == Transaction::RolledBack ) )
343             d->transaction = 0;
344 
345         if ( !d->active || d->startup ) {
346             error( "Timeout negotiating connection to PostgreSQL." );
347         }
348         else if ( d->transaction ) {
349             if ( d->queries.isEmpty() )
350                 d->transaction->rollback();
351             else if ( d->backendPid )
352                 new PgData::LockSpotter( d->backendPid, d->transaction );
353             else
354                 log( "Transaction unexpectedly slow; continuing " );
355         }
356         else if ( d->queries.isEmpty() &&
357                   ::listener != this &&
358                   server().protocol() != Endpoint::Unix &&
359                   handlesNeeded() < numHandles() ) {
360             log( "Closing idle database backend " + fn( connectionNumber() ) +
361                  " (" + fn( numHandles()-1 ) + " remaining)" );
362             shutdown();
363         }
364         break;
365 
366     case Shutdown:
367         shutdown();
368         break;
369     }
370 
371     if ( usable() ) {
372         processQueue();
373         if ( d->queries.isEmpty() && !d->transaction ) {
374             uint interval =
375                 Configuration::scalar( Configuration::DbHandleInterval );
376             if ( ::listener == this )
377                 interval = interval * 2;
378             else if ( idleHandles() > 2 && interval > 20 )
379                 interval = 20;
380             setTimeoutAfter( interval );
381         }
382     }
383     if ( d->transaction && d->queries.isEmpty() ) {
384         // if the transaction doesn't do anything, just sits there
385         // holding its handle, we have to rollback() in order to free
386         // the handle for better work. we do it more quickly for a
387         // broken transaction than for one that seems fine.
388         if ( d->transaction->state() == Transaction::Failed )
389             setTimeoutAfter( 5 );
390         else
391             setTimeoutAfter( 20 );
392     }
393 }
394 
395 
396 /*! This function handles the authentication phase of the protocol. It
397     expects and responds to an authentication request, and waits for a
398     positive response before entering the backend startup phase. It is
399     called by react with the \a type of the message to process.
400 */
401 
authentication(char type)402 void Postgres::authentication( char type )
403 {
404     switch ( type ) {
405     case 'R':
406         {
407             PgAuthRequest r( readBuffer() );
408 
409             switch ( r.type() ) {
410             case PgAuthRequest::Success:
411                 d->authenticated = true;
412                 break;
413 
414             case PgAuthRequest::Password:
415             case PgAuthRequest::Crypt:
416             case PgAuthRequest::MD5:
417                 {
418                     EString pass = password();
419 
420                     if ( d->setSessionAuthorisation ) {
421                         error( "Cannot supply credentials during proxy "
422                                "authentication" );
423                         return;
424                     }
425 
426                     if ( r.type() == PgAuthRequest::Crypt )
427                         pass = ::crypt( pass.cstr(), r.salt().cstr() );
428                     else if ( r.type() == PgAuthRequest::MD5 )
429                         pass = "md5" + MD5::hash(
430                                            MD5::hash(
431                                                pass + d->user
432                                            ).hex() + r.salt()
433                                        ).hex();
434 
435                     PgPasswordMessage p( pass );
436                     p.enqueue( writeBuffer() );
437                 }
438                 break;
439 
440             default:
441                 error( "Unsupported PgAuthRequest." );
442                 break;
443             }
444         }
445         break;
446 
447     default:
448         d->unknownMessage = true;
449         break;
450     }
451 }
452 
453 
454 /*! This function negotiates the backend startup phase of the protocol
455     (storing any messages the server sends us), concluding the startup
456     process when the server indicates that it is ready for queries. It
457     is called by react() with the \a type of the message to process.
458 */
459 
backendStartup(char type)460 void Postgres::backendStartup( char type )
461 {
462     switch ( type ) {
463     case 'Z':
464         setTimeout( 0 );
465         d->startup = false;
466         if ( CitextLookup::necessary() )
467             processQuery( (new CitextLookup())->q );
468         addHandle( this );
469 
470         // This successfully concludes connection startup. We'll leave
471         // this message unparsed, so that process() can handle it like
472         // any other PgReady.
473 
474         if ( d->setSessionAuthorisation )
475             processQuery( new Query( "SET SESSION AUTHORIZATION " +
476                                      Database::user(), 0 ) );
477 
478         break;
479 
480     case 'K':
481         d->keydata = new PgKeyData( readBuffer() );
482         log( "Postgres backend " + fn( connectionNumber() ) +
483              " has pid " + fn( d->keydata->pid() ), Log::Debug );
484         d->backendPid = d->keydata->pid();
485         break;
486 
487     default:
488         d->unknownMessage = true;
489         break;
490     }
491 }
492 
493 
494 /*! This function handles interaction with the server once the startup
495     phase is complete. It is called by react() with the \a type of the
496     message to process.
497 */
498 
process(char type)499 void Postgres::process( char type )
500 {
501     Query * q = d->queries.firstElement();
502     Scope x;
503     if ( q && q->log() )
504         x.setLog( q->log() );
505 
506     extendTimeout( 5 );
507 
508     switch ( type ) {
509     case '1':
510         {
511             PgParseComplete msg( readBuffer() );
512             if ( q && q->name() != "" )
513                 d->preparesPending.shift();
514         }
515         break;
516 
517     case '2':
518         {
519             PgBindComplete msg( readBuffer() );
520         }
521         break;
522 
523     case 'n':
524         {
525             PgNoData msg( readBuffer() );
526         }
527         break;
528 
529     case 't':
530         (void)new PgParameterDescription( readBuffer() );
531         break;
532 
533     case 'G':
534         {
535             PgCopyInResponse msg( readBuffer() );
536             if ( q && q->inputLines() ) {
537                 log( "Sending " + fn( q->inputLines()->count() ) +
538                      " data rows",
539                      Log::Debug );
540                 PgCopyData cd( q );
541                 PgCopyDone e;
542 
543                 cd.enqueue( writeBuffer() );
544                 e.enqueue( writeBuffer() );
545             }
546             else {
547                 PgCopyFail f;
548                 f.enqueue( writeBuffer() );
549             }
550 
551             PgSync s;
552             s.enqueue( writeBuffer() );
553             d->sendingCopy = false;
554         }
555         break;
556 
557     case 'T':
558         d->description = new PgRowDescription( readBuffer() );
559         break;
560 
561     case 'D':
562         {
563             if ( !q || !d->description ) {
564                 error( "Unexpected data row" );
565                 return;
566             }
567 
568             PgDataRow msg( readBuffer(), d->description );
569             q->addRow( msg.row() );
570             if ( d->needNotify && d->needNotify != q )
571                 d->needNotify->notify();
572             d->needNotify = q;
573         }
574         break;
575 
576     case 'I':
577     case 'C':
578         {
579             PgCommandComplete * cc = 0;
580             if ( type == 'C' )
581                 cc = new PgCommandComplete( readBuffer() );
582             else
583                 PgEmptyQueryResponse msg( readBuffer() );
584 
585             if ( q ) {
586                 EString s;
587                 s.append( "Dequeueing query " );
588                 s.append( q->description() );
589                 s.append( " on backend " );
590                 s.appendNumber( connectionNumber() );
591                 EString command;
592                 if ( cc )
593                     command = cc->tag().section( " ", 1 );
594                 if ( cc && !q->rows() ) {
595                     uint an = 2;
596                     if ( command == "INSERT" )
597                         an = 3;
598                     q->setRows( cc->tag().section( " ", an ).number( 0 ) );
599                 }
600                 if ( q->rows() ||
601                      command == "SELECT" || command == "FETCH" ||
602                      command == "INSERT" || command == "UPDATE" ) {
603                     s.append( " (with " );
604                     s.appendNumber( q->rows() );
605                     s.append( " rows)" );
606                 }
607                 ::log( s, Log::Info );
608                 if ( !q->done() ) {
609                     q->setState( Query::Completed );
610                     countQueries( q );
611                 }
612                 d->queries.shift();
613                 q->notify();
614                 d->needNotify = 0;
615             }
616         }
617         break;
618 
619     case 'Z':
620         {
621             PgReady msg( readBuffer() );
622             setState( msg.state() );
623         }
624         break;
625 
626     case 'A':
627         {
628             PgNotificationResponse msg( readBuffer() );
629             EString s;
630             if ( !msg.source().isEmpty() )
631                 s = " (" + msg.source() + ")";
632             log( "Received notify " + msg.name().quoted() +
633                  " from server pid " + fn( msg.pid() ) + s, Log::Debug );
634             DatabaseSignal::notifyAll( msg.name() );
635         }
636         break;
637 
638     default:
639         d->unknownMessage = true;
640         break;
641     }
642 }
643 
644 
645 /*! This function handles unknown or unwanted messages that some other
646     function declined to process (by setting d->unknownMessage). It is
647     called by react() with the \a type of the unknown message.
648 */
649 
unknown(char type)650 void Postgres::unknown( char type )
651 {
652     switch ( type ) {
653     case 'S':
654         {
655             d->unknownMessage = false;
656             PgParameterStatus msg( readBuffer() );
657 
658             EString n = msg.name();
659             EString v = msg.value();
660             EString e;
661             bool known = true;
662             if ( n == "client_encoding" ) {
663                 if ( v != "UTF8" && v != "SQL_ASCII" )
664                     e = "Unexpected client encoding: ";
665             }
666             else if ( n == "DateStyle" ) {
667                 // we want ISO on the list somewhere
668                 if ( !v.containsWord( "ISO" ) )
669                     e = "DateStyle apparently does not support ISO: ";
670             }
671             else if ( n == "integer_datetimes" ) {
672                 // PG documentation says:
673                 //     "Use 64-bit integer storage for datetimes and
674                 //     intervals, rather than the default floating-point
675                 //     storage. This reduces the range of representable
676                 //     values but guarantees microsecond precision across
677                 //     the full range (see Section 8.5 for more
678                 //     information)."
679                 // We don't care about that. Email uses only seconds,
680                 // and only a fairly limited time range. Both on and
681                 // off are okay.
682             }
683             else if ( n == "is_superuser" ) {
684                 if ( v.simplified().lower() != "off" )
685                     e = "Connected as superuser: ";
686             }
687             else if ( n == "server_encoding" ) {
688                 if ( v != "UTF8" && v != "SQL_ASCII" )
689                     e = "Unexpected server encoding: ";
690             }
691             else if ( n == "server_version" ) {
692                 bool ok = true;
693                 serverVersion = 10000 * v.section( ".", 1 ).number( &ok ) +
694                                 100 * v.section( ".", 2 ).number( &ok ) +
695                                 v.section( ".", 3 ).number( &ok );
696                 if ( !ok || version() < 90100 )
697                     e = "Archiveopteryx requires PostgreSQL 9.1 or higher: ";
698             }
699             else if ( n == "session_authorization" ) {
700                 // we could test that v is d->user, but I don't think
701                 // we care. besides it might sound an alarm about our
702                 // ident workarounds.
703             }
704             else if ( n == "standard_conforming_strings" ) {
705                 // hm... ?
706             }
707             else if ( n == "TimeZone" ) {
708                 // we don't care.
709             }
710             else {
711                 known = false;
712             }
713             if ( known && e.isEmpty() ) {
714                 // we're entirely silent about this. all is well.
715             }
716             else {
717                 EString s( "PostgreSQL server: " );
718                 if ( e.isEmpty() )
719                     s.append( "SET " );
720                 else
721                     s.append( e );
722                 s.append( n );
723                 s.append( "=" );
724                 s.append( v.quoted() );
725                 if ( e.isEmpty() )
726                     ::log( s, Log::Debug );
727                 else
728                     ::log( s );
729             }
730         }
731         break;
732 
733     case 'N':
734     case 'E':
735         d->unknownMessage = false;
736         serverMessage();
737         break;
738 
739     default:
740         {
741             EString err = "Unexpected message (";
742 
743             if ( type > 32 && type < 127 )
744                 err.append( type );
745             else
746                 err.append( "%" + fn( (int)type, 16 ) );
747 
748             err.append( ") received" );
749             if ( d->startup ) {
750                 if ( !d->authenticated )
751                     err.append( " during authentication" );
752                 else
753                     err.append( " during backend startup" );
754             }
755             err.append( "." );
756             error( err );
757         }
758         break;
759     }
760 }
761 
762 
763 /*! This function handles errors and other messages from the server.
764 
765     Uses the sqlstates specified
766     http://www.postgresql.org/docs/current/static/protocol.html
767     extensively.
768 */
769 
serverMessage()770 void Postgres::serverMessage()
771 {
772     Scope x;
773     EString s;
774     PgMessage msg( readBuffer() );
775     Query *q = d->queries.firstElement();
776     EString m( msg.message() );
777     EString code = msg.code();
778     Endpoint server( peer() );
779 
780     if ( code == "57P03" ) {
781         log( "Retrying connection after delay because PostgreSQL "
782              "is still starting up.", Log::Info );
783         close();
784         sleep( 1 );
785         connect( server );
786     }
787     else if ( code == "57P01" || code == "57P02" ) {
788         if ( code == "57P01" )
789             log( "PostgreSQL is shutting down; closing connection.", Log::Info );
790         else
791             log( "PostgreSQL reports a crash; closing connection.", Log::Info );
792         removeHandle( this );
793         if ( ::listener == this )
794             ::listener = 0;
795         close();
796         error( "PostgreSQL server shut down" );
797     }
798     else if ( code == "28000" && m.lower().containsWord( "ident" ) ) {
799         int b = m.find( '"' );
800         int e = m.find( '"', b+1 );
801         EString user( m.mid( b+1, e-b-1 ) );
802 
803         struct passwd * u = getpwnam( d->user.cstr() );
804 
805         struct passwd * p = 0;
806         const char * pg
807             = Configuration::compiledIn( Configuration::PgUser );
808 
809         if ( pg )
810             p = getpwnam( pg );
811         if ( !p )
812             p = getpwnam( "postgres" );
813         if ( !p )
814             p = getpwnam( "pgsql" );
815 
816         if ( !d->identBreakageSeen && loginAs() == DbOwner &&
817              u == 0 && p != 0 )
818         {
819             d->identBreakageSeen = true;
820             d->setSessionAuthorisation = true;
821             log( "Attempting to authenticate as superuser to use "
822                  "SET SESSION AUTHORIZATION", Log::Info );
823             d->user = EString( p->pw_name );
824             uid_t e = geteuid();
825             setreuid( 0, p->pw_uid );
826             close();
827             connect( server );
828             setreuid( 0, e );
829         }
830         else if ( s == Configuration::text(Configuration::JailUser) &&
831                   Configuration::toggle( Configuration::Security ) &&
832                   self().protocol() != Endpoint::Unix )
833         {
834             // If we connected via IPv4 or IPv6, early enough that
835             // postgres had a chance to reject us, we'll try again.
836             d->identBreakageSeen = true;
837             log( "PostgreSQL demanded IDENT, which did not match "
838                  "during startup. Retrying.", Log::Info );
839             close();
840             connect( server );
841         }
842         else {
843             log( "PostgreSQL refuses authentication because this "
844                  "process is not running as user " + user.quoted() +
845                  ". See http://aox.org/faq/mailstore#ident",
846                  Log::Disaster );
847         }
848     }
849     else if ( code == "28000" ) {
850         log( "Cannot authenticate as PostgreSQL user " + d->user.quoted() +
851              ". Server message: " + msg.message(), Log::Disaster );
852     }
853     else if ( code.startsWith( "53" ) ) {
854         uint m = Configuration::scalar( Configuration::DbMaxHandles );
855         if ( code == "53000" )
856             log( "PostgreSQL server reports too many client connections. "
857                  "Our connection count is " + fn( numHandles() ) + ", "
858                  "configured maximum is " + fn( m ) + ".",
859                  Log::Error );
860         else
861             log( "PostgreSQL server has a resource problem (" + code + "): " +
862                  msg.message(),
863                  Log::Significant );
864         if ( m > 2 ) {
865             log( "Setting db-max-handles to 2 (was " + fn( m ) + ")" );
866             Configuration::add( "db-max-handles = 2" );
867         }
868 
869     }
870     else if ( msg.type() == PgMessage::Notification ) {
871         s.append( "PostgreSQL server: " );
872         if ( q ) {
873             s.append( "Query " + q->description() + ": " );
874             x.setLog( q->log() );
875         }
876         s.append( m );
877         if ( !code.startsWith( "00" ) )
878             s.append( " (warning)" );
879         ::log( s, Log::Debug );
880     }
881     else if ( q && !code.startsWith( "00" ) ) {
882         s.append( "PostgreSQL server: " );
883         s.append( "Query " + q->description() + " failed: " );
884         x.setLog( q->log() );
885         s.append( m );
886         if ( !msg.detail().isEmpty() )
887             s.append( " (" + msg.detail() + ")" );
888         s.append( " (" + code + ")" );
889 
890         // If we sent a Parse message for a named prepared statement
891         // while processing this query, but don't already know that
892         // it succeeded, we'll assume that statement name does not
893         // exist for future use.
894         EString * pp = d->preparesPending.first();
895         if ( q->name() != "" && pp && *pp == q->name() ) {
896             d->prepared.remove( q->name() );
897             d->preparesPending.shift();
898         }
899         if ( q->inputLines() )
900             d->sendingCopy = false;
901         d->queries.shift();
902         m = mapped( m );
903         if ( !msg.detail().isEmpty() )
904             s.append( " (" + msg.detail() + ")" );
905         q->setError( m );
906         q->notify();
907     }
908     else {
909         ::log( "PostgreSQL server message could not be interpreted."
910                " Message: " + msg.message() +
911                " SQL state code: " + code +
912                " Severity: " + msg.severity().lower(),
913                Log::Error );
914     }
915 
916     if ( code.startsWith( "08" ) ) // connection exception
917         error( "PostgreSQL server error: " + s );
918 }
919 
920 
921 // these errors are based on a selection of the results from
922 // select indexname from pg_indexes where tablename in
923 //  (select tablename from pg_tables where tableowner='aoxsuper')
924 
925 static const struct {
926     const char * constraint;
927     const char * human;
928 } errormap[] = {
929     // some index names
930     {"addresses_nld_key",
931      "Operation would create two identical addresses" },
932     {"u_l",
933      "Operation wold create two users with identical login names"},
934     // some constraints from our postgresql schema
935     {"aliases_address_fkey", // contype f
936      "Operation would create two aliases with the same address"},
937     {"aliases_address_key", // contype u
938      "Operation would create two aliases with the same address"},
939     {"annotation_names_name_key", // contype u
940      "Operation would create two annotation_names rows with the same_name"},
941     {"annotations_mailbox_key", // contype u
942      "Operation would create a duplicate annotations row"},
943     {"annotations_mailbox_key1", // contype u
944      "Operation would create a duplicate annotations row"},
945     // XXX where does the annotations unique condition end up?
946     {"deliveries_message_key", // contype u
947      "Operation would store the same message for delivery twice"},
948     {"field_names_name_key", // contype u
949      "Operation would create two header field names with the same name"},
950     {"fn_uname",
951      "Operation would store two identical flag names separately"},
952     {"group_members_groupname_fkey", // contype f
953      "Operation would create group_members row with invalid groupname"},
954     {"group_members_member_fkey", // contype f
955      "Operation would create group_members row with invalid member"},
956     {"group_members_pkey", // contype p
957      "Operation would create duplicate group_members row"},
958     // XXX shouldb't groups.name be unique? and different from all users.name?
959     {"mailboxes_name_key", // contype u
960      "Operation would create two mailboxes with the same name"},
961     {"mailboxes_owner_fkey", // contype f
962      "Operation would create a mailbox without an owner"},
963     {"messages_id_key", // contype u
964      "Opeation would create two messages objects with the same ID"},
965     {"namespaces_name_key", // contype u
966      "Operation would create two user namespaces with the same name"},
967     {"permissions_mailbox_fkey", // contype f
968      "Operation would create a permissions row without a mailbox"},
969     {"permissions_pkey", // contype p
970      "Operation would create a duplicate permissions row"},
971     {"scripts_owner_key", // contype u
972      "Operation would store two scripts with the same owner and name"},
973     // XXX shouldn't users.alias be unique?
974     {"users_alias_fkey", // contype f
975      "users_alias"},
976     {"users_parentspace_fkey", // contype f
977      "Operation would create a users row without a namespace"},
978     {0,0}
979 };
980 
981 
982 
983 /*! Looks for constraint names in \a s and returns an error message
984     corresponding to the relevant constraint. Returns \a s if it finds
985     none.
986 */
987 
mapped(const EString & s) const988 EString Postgres::mapped( const EString & s ) const
989 {
990     if ( !s.contains( "_" ) )
991         return "PostgreSQL Server: " + s;
992 
993     EString h;
994     uint maps = 0;
995     EString w;
996     uint i = 0;
997     while ( maps < 2 && i <= s.length() ) {
998         char c = s[i];
999         if ( ( c >= 'a' && c <= 'z' ) ||
1000              ( c >= 'A' && c <= 'Z' ) ||
1001              ( c >= '0' && c <= '9' ) ||
1002              ( c == '_' ) ) {
1003             w.append( c );
1004         }
1005         else if ( !w.isEmpty() ) {
1006             uint j = 0;
1007             while ( errormap[j].constraint && w != errormap[j].constraint )
1008                 j++;
1009             if ( errormap[j].constraint ) {
1010                 maps++;
1011                 h = errormap[j].human;
1012                 h.append( " (" );
1013                 h.append( w );
1014                 h.append( ")" );
1015             }
1016             w.truncate();
1017         }
1018         i++;
1019     }
1020     if ( maps != 1 )
1021         return "PostgreSQL Server: " + s;
1022 
1023     return h;
1024 }
1025 
1026 
1027 
1028 /*! Handles all protocol/socket errors by logging the error message \a s
1029     and closing the connection after emptying the write buffer and
1030     notifying any pending queries of the failure.
1031 
1032 */
1033 
error(const EString & s)1034 void Postgres::error( const EString &s )
1035 {
1036     if ( ::listener == this )
1037         ::listener = 0;
1038 
1039     Scope x( log() );
1040     ::log( s + " (on backend " + fn( connectionNumber() ) + ")", Log::Error );
1041 
1042     d->error = true;
1043     d->active = false;
1044     setState( Broken );
1045 
1046     List< Query >::Iterator q( d->queries );
1047     while ( q ) {
1048         q->setError( s );
1049         q->notify();
1050         ++q;
1051     }
1052 
1053     removeHandle( this );
1054 
1055     writeBuffer()->remove( writeBuffer()->size() );
1056     Connection::setState( Closing );
1057 }
1058 
1059 
1060 /*! Sends a termination message and takes this database handle out of
1061     circulation gracefully.
1062 */
1063 
shutdown()1064 void Postgres::shutdown()
1065 {
1066     if ( ::listener == this )
1067         ::listener = 0;
1068 
1069     PgTerminate msg;
1070     msg.enqueue( writeBuffer() );
1071 
1072     if ( d->transaction ) {
1073         d->transaction->setError( 0, "Database connection shutdown" );
1074         d->transaction->notify();
1075     }
1076     List< Query >::Iterator q( d->queries );
1077     while ( q ) {
1078         if ( !q->done() ) {
1079             q->setError( "Database connection shutdown" );
1080             q->notify();
1081         }
1082         ++q;
1083     }
1084 
1085     removeHandle( this );
1086     d->active = false;
1087 }
1088 
1089 
hasMessage(Buffer * b)1090 static bool hasMessage( Buffer *b )
1091 {
1092     if ( b->size() < 5 ||
1093          b->size() < 1+( (uint)((*b)[1]<<24)|((*b)[2]<<16)|
1094                                ((*b)[3]<<8)|((*b)[4]) ) )
1095         return false;
1096     return true;
1097 }
1098 
1099 
1100 /*! Returns true if this handle is willing to process new queries: i.e.
1101     if it has an active and error-free connection to the server, and no
1102     outstanding queries; and false otherwise.
1103 */
1104 
usable() const1105 bool Postgres::usable() const
1106 {
1107     return ( d->active && !d->startup &&
1108              !( state() == Connecting || state() == Broken ) &&
1109              d->queries.isEmpty() );
1110 }
1111 
1112 
1113 static GraphableCounter * goodQueries = 0;
1114 static GraphableCounter * badQueries = 0;
1115 
1116 
1117 /*! Updates the statistics when \a q is done. */
1118 
countQueries(class Query * q)1119 void Postgres::countQueries( class Query * q )
1120 {
1121     if ( !goodQueries ) {
1122         goodQueries = new GraphableCounter( "queries-executed" ); // bad name?
1123         badQueries = new GraphableCounter( "queries-failed" ); // bad name?
1124     }
1125 
1126     if ( !q->failed() )
1127         goodQueries->tick();
1128     else if ( !q->canFail() )
1129         badQueries->tick();
1130     ; // a query which fails but canFail is not counted anywhere.
1131 
1132     // later also use GraphableDataSet to keep track of query
1133     // execution times, but not right now.
1134 }
1135 
1136 
1137 /*! Returns the Postgres server's declared version number as an
1138     integer. 8.1.0 is returned as 80100, 8.3.2 as 80302.
1139 
1140     The version number is learned immediately after connecting.
1141     version() returns 0 until the first Postgres instance learns the
1142     server version.
1143 */
1144 
version()1145 uint Postgres::version()
1146 {
1147     return ::serverVersion;
1148 }
1149 
1150 
1151 /*! Makes sure Postgres sends as many LISTEN commands as necessary,
1152     see DatabaseSignal and
1153     http://www.postgresql.org/docs/8.1/static/sql-listen.html
1154 
1155 */
1156 
sendListen()1157 void Postgres::sendListen()
1158 {
1159     EStringList::Iterator s( DatabaseSignal::names() );
1160     while ( s ) {
1161         EString name = *s;
1162         ++s;
1163         if ( !d->listening.contains( name ) ) {
1164             d->listening.append( name );
1165             if ( !name.boring() )
1166                 name = name.quoted();
1167             processQuery( new Query( "listen " + name, 0 ) );
1168         }
1169     }
1170 }
1171 
1172 
1173 /*! Returns the query string for \a q, after possibly applying
1174     version-specific hacks and workarounds. */
1175 
queryString(Query * q)1176 EString Postgres::queryString( Query * q )
1177 {
1178     if ( !q->name().isEmpty() )
1179         return q->string();
1180 
1181     EString s( q->string() );
1182 
1183     if ( s != q->string() ) {
1184         Scope x( q->log() );
1185         log( "Changing query string to: " + s, Log::Debug );
1186     }
1187 
1188     return s;
1189 }
1190 
1191 
1192 class PgCanceller
1193     : public Postgres
1194 {
1195 private:
1196     PgKeyData * k;
1197 
1198 public:
PgCanceller(PgKeyData * key)1199     PgCanceller( PgKeyData * key )
1200         : Postgres(), k( key )
1201     {
1202         log( "Sending cancel for pid " + fn( k->pid() ), Log::Debug );
1203     }
1204 
react(Event e)1205     void react( Event e )
1206     {
1207         switch (e) {
1208         case Connect:
1209             {
1210                 PgCancel msg( k );
1211                 msg.enqueue( writeBuffer() );
1212                 Connection::setState( Closing );
1213             }
1214             break;
1215 
1216         default:
1217             break;
1218         }
1219     }
1220 };
1221 
1222 
1223 /*! Issues a cancel request for the query \a q if it is being executed
1224     by this Postgres object. If not, it does nothing.
1225 */
1226 
cancel(Query * q)1227 void Postgres::cancel( Query * q )
1228 {
1229     if ( d->queries.find( q ) )
1230         (void)new PgCanceller( d->keydata );
1231 }
1232