1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2 
3 #include "postgres.h"
4 
5 #include "dict.h"
6 #include "list.h"
7 #include "estring.h"
8 #include "buffer.h"
9 #include "dbsignal.h"
10 #include "allocator.h"
11 #include "configuration.h"
12 #include "transaction.h"
13 #include "estringlist.h"
14 #include "pgmessage.h"
15 #include "eventloop.h"
16 #include "graph.h"
17 #include "query.h"
18 #include "event.h"
19 #include "scope.h"
20 #include "md5.h"
21 #include "log.h"
22 
23 // crypt(), setreuid(), getpwnam()
24 #define _XOPEN_SOURCE 600
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <pwd.h>
28 
29 
30 static bool hasMessage( Buffer * );
31 static uint serverVersion;
32 static Postgres * listener = 0;
33 
34 
35 class PgData
36     : public Garbage
37 {
38 public:
PgData()39     PgData()
40         : active( false ), startup( false ), authenticated( false ),
41           unknownMessage( false ), identBreakageSeen( false ),
42           setSessionAuthorisation( false ),
43           sendingCopy( false ), error( false ),
44           keydata( 0 ),
45           description( 0 ), transaction( 0 ),
46           needNotify( 0 ), backendPid( 0 )
47         {}
48 
49     bool active;
50     bool startup;
51     bool authenticated;
52     bool unknownMessage;
53     bool identBreakageSeen;
54     bool setSessionAuthorisation;
55     bool sendingCopy;
56     bool error;
57     EStringList listening;
58 
59     PgKeyData *keydata;
60     PgRowDescription *description;
61     Dict<Postgres> prepared;
62     EStringList preparesPending;
63 
64     List< Query > queries;
65     Transaction *transaction;
66     Query * needNotify;
67 
68     EString user;
69 
70     uint backendPid;
71 
72     class LockSpotter
73         : public EventHandler {
74     public:
LockSpotter(uint p,Transaction * t)75         LockSpotter( uint p, Transaction * t ): EventHandler(), q( 0 ) {
76             setLog( new Log( t->owner()->log() ) );
77             Scope x( log() );
78             EString s(
79                 "select h.pid::int, a.xact_start::text,"
80                 " coalesce(a.client_addr::text,''::text) as client_addr, "
81                 " a.current_query::text, "
82                 " a.usename::text, "
83                 " a.current_query,"
84                 " w.locktype::text "
85                 "from pg_locks h join pg_locks w using (locktype) "
86                 "join pg_stat_activity a on (h.pid="
87             );
88             if (Postgres::version() < 90200)
89                 s.append( "a.procpid" );
90             else
91                 s.append( "a.pid" );
92             s.append(
93                 ") "
94                 "where h.granted and not w.granted and w.pid=$1 and "
95                 "coalesce(h.relation::text, h.page::text, h.tuple::text, "
96                 " h.transactionid::text, h.virtualxid)="
97                 "coalesce(w.relation::text, w.page::text, w.tuple::text, "
98                 " w.transactionid::text, w.virtualxid)"
99             );
100             q = new Query( s, this );
101             q->bind( 1, p );
102             q->execute();
103         }
execute()104         void execute() {
105             while ( q->hasResults() ) {
106                 Row * r = q->nextRow();
107                 log( "Transaction is waiting for a lock of type " +
108                      r->getEString( "locktype" ).quoted() + ". "
109                      "The following points to the lock's current holder. "
110                      "PID: " + fn( r->getInt( "pid" ) ) + " "
111                      "Transaction start time: " + r->getEString( "xact_start" ) + " "
112                      "Username: " + r->getEString( "usename" ) + " "
113                      "Client address: " + r->getEString( "client_addr" ) + " "
114                      "Current query: " + r->getEString( "current_query" ).quoted(),
115                      Log::Significant );
116             }
117         }
118         Query * q;
119     };
120 };
121 
122 
123 /*! \class Postgres postgres.h
124     Implements the PostgreSQL 3.0 Frontend-Backend protocol.
125 
126     This is our interface to PostgreSQL. As a subclass of Database, it
127     accepts Query objects, sends queries to the database, and notifies
128     callers about any resulting data. As a descendant of Connection, it
129     is responsible for all network communications with the server.
130 
131     The network protocol is documented at <doc/src/sgml/protocol.sgml>
132     and <http://www.postgresql.org/docs/current/static/protocol.html>.
133     The version implemented here is used by PostgreSQL 7.4 and later.
134 
135     At the time of writing, there do not seem to be any other suitable
136     PostgreSQL client libraries available. For example, libpqxx doesn't
137     support asynchronous operation or prepared statements. Its interface
138     would be difficult to wrap in a database-agnostic manner, and it
139     depends on the untested libpq. The others aren't much better.
140 */
141 
142 /*! Creates a Postgres object, initiates a TCP connection to the server,
143     registers with the main loop, and adds this Database to the list of
144     available handles.
145 */
146 
Postgres()147 Postgres::Postgres()
148     : Database(), d( new PgData )
149 {
150     d->user = Database::user();
151     struct passwd * p = getpwnam( d->user.cstr() );
152     if ( p && getuid() != p->pw_uid ) {
153         // Try to cooperate with ident authentication.
154         uid_t e = geteuid();
155         setreuid( 0, p->pw_uid );
156         connect( address(), port() );
157         setreuid( 0, e );
158     }
159     else {
160         connect( address(), port() );
161     }
162 
163     log( "Connecting to PostgreSQL server at " +
164          address() + ":" + fn( port() ) + " "
165          "(backend " + fn( connectionNumber() ) + ", fd " + fn( fd() ) +
166          ", user " + d->user + ")", Log::Debug );
167 
168     if ( Connection::state() != Invalid ) {
169         setTimeoutAfter( 10 );
170         EventLoop::global()->addConnection( this );
171     }
172 }
173 
174 
~Postgres()175 Postgres::~Postgres()
176 {
177     EventLoop::global()->removeConnection( this );
178 }
179 
180 
processQueue()181 void Postgres::processQueue()
182 {
183     if ( !d->queries.isEmpty() )
184         return;
185 
186     if ( d->sendingCopy )
187         return;
188 
189     if ( d->transaction &&
190          ( d->transaction->state() == Transaction::Completed ||
191            d->transaction->state() == Transaction::RolledBack ) )
192         d->transaction = 0;
193 
194     if ( !::listener && !d->transaction )
195         ::listener = this;
196     if ( ::listener == this )
197         sendListen();
198 
199     List< Query > * l = 0;
200     if ( d->transaction ) {
201         l = d->transaction->submittedQueries();
202     }
203     else {
204         if ( listener == this && numHandles() > 1 )
205             l = Database::firstSubmittedQuery( false );
206         else
207             l = Database::firstSubmittedQuery( true );
208 
209         if ( l->firstElement() && l->firstElement()->transaction() ) {
210             Transaction * t = l->firstElement()->transaction();
211             d->transaction = t;
212             t->setDatabase( this );
213         }
214     }
215 
216     Query * q = l->shift();
217     while ( q ) {
218         q->setState( Query::Executing );
219         if ( !d->error ) {
220             processQuery( q );
221         }
222         else {
223             q->setError( "Database handle no longer usable." );
224             q->notify();
225         }
226         q = l->shift();
227     }
228 
229     if ( d->queries.isEmpty() )
230         reactToIdleness();
231 }
232 
233 
234 /*! Sends whatever messages are required to make the backend process the
235     query \a q.
236 */
237 
processQuery(Query * q)238 void Postgres::processQuery( Query * q )
239 {
240     Scope x( q->log() );
241     d->queries.append( q );
242     EString s( "Sent " );
243     if ( q->name() == "" ||
244          !d->prepared.contains( q->name() ) )
245     {
246         PgParse a( queryString( q ), q->name() );
247         a.enqueue( writeBuffer() );
248 
249         if ( q->name() != "" ) {
250             d->prepared.insert( q->name(), this );
251             d->preparesPending.append( q->name() );
252         }
253 
254         s.append( "parse/" );
255     }
256 
257     PgBind b( q->name() );
258     b.bind( q->values() );
259     b.enqueue( writeBuffer() );
260 
261     PgDescribe c;
262     c.enqueue( writeBuffer() );
263 
264     PgExecute ex;
265     ex.enqueue( writeBuffer() );
266 
267     PgSync e;
268     e.enqueue( writeBuffer() );
269 
270     s.append( "execute for " );
271     s.append( q->description() );
272     s.append( " on backend " );
273     s.appendNumber( connectionNumber() );
274     ::log( s, Log::Debug );
275     recordExecution();
276 }
277 
278 
react(Event e)279 void Postgres::react( Event e )
280 {
281     switch ( e ) {
282     case Connect:
283         {
284             PgStartup msg;
285             msg.setOption( "user", d->user );
286             msg.setOption( "database", name() );
287             msg.setOption( "search_path",
288                            Configuration::text( Configuration::DbSchema ) );
289             msg.enqueue( writeBuffer() );
290 
291             d->active = true;
292             d->startup = true;
293         }
294         break;
295 
296     case Read:
297         while ( d->active && hasMessage( readBuffer() ) ) {
298             /* We call a function to process every message we receive.
299                This function is expected to parse and remove a message
300                from the readBuffer, throwing an exception for malformed
301                messages, and setting d->unknownMessage for messages that
302                it can't or won't handle. */
303 
304             char msg = (*readBuffer())[0];
305             try {
306                 if ( d->startup ) {
307                     if ( !d->authenticated )
308                         authentication( msg );
309                     else
310                         backendStartup( msg );
311                 }
312                 else {
313                     process( msg );
314                 }
315 
316                 if ( d->unknownMessage )
317                     unknown( msg );
318             }
319             catch ( PgServerMessage::Error e ) {
320                 error( "Malformed " + EString( &msg, 1 ).quoted() +
321                        " message received." );
322             }
323         }
324         if ( d->needNotify )
325             d->needNotify->notify();
326         d->needNotify = 0;
327 
328         break;
329 
330     case Error:
331         error( "Couldn't connect to PostgreSQL." );
332         break;
333 
334     case Close:
335         if ( d->active )
336             error( "Connection terminated by the server." );
337         break;
338 
339     case Timeout:
340         if ( d->transaction &&
341              ( d->transaction->state() == Transaction::Completed ||
342                d->transaction->state() == Transaction::RolledBack ) )
343             d->transaction = 0;
344 
345         if ( !d->active || d->startup ) {
346             error( "Timeout negotiating connection to PostgreSQL." );
347         }
348         else if ( d->transaction ) {
349             if ( d->queries.isEmpty() )
350                 d->transaction->rollback();
351             else if ( d->backendPid )
352                 new PgData::LockSpotter( d->backendPid, d->transaction );
353             else
354                 log( "Transaction unexpectedly slow; continuing " );
355         }
356         else if ( d->queries.isEmpty() &&
357                   ::listener != this &&
358                   server().protocol() != Endpoint::Unix &&
359                   handlesNeeded() < numHandles() ) {
360             log( "Closing idle database backend " + fn( connectionNumber() ) +
361                  " (" + fn( numHandles()-1 ) + " remaining)" );
362             shutdown();
363         }
364         break;
365 
366     case Shutdown:
367         shutdown();
368         break;
369     }
370 
371     if ( usable() ) {
372         processQueue();
373         if ( d->queries.isEmpty() && !d->transaction ) {
374             uint interval =
375                 Configuration::scalar( Configuration::DbHandleInterval );
376             if ( ::listener == this )
377                 interval = interval * 2;
378             else if ( idleHandles() > 2 && interval > 20 )
379                 interval = 20;
380             setTimeoutAfter( interval );
381         }
382     }
383     if ( d->transaction && d->queries.isEmpty() ) {
384         // if the transaction doesn't do anything, just sits there
385         // holding its handle, we have to rollback() in order to free
386         // the handle for better work. we do it more quickly for a
387         // broken transaction than for one that seems fine.
388         if ( d->transaction->state() == Transaction::Failed )
389             setTimeoutAfter( 5 );
390         else
391             setTimeoutAfter( 20 );
392     }
393 }
394 
395 
396 /*! This function handles the authentication phase of the protocol. It
397     expects and responds to an authentication request, and waits for a
398     positive response before entering the backend startup phase. It is
399     called by react with the \a type of the message to process.
400 */
401 
authentication(char type)402 void Postgres::authentication( char type )
403 {
404     switch ( type ) {
405     case 'R':
406         {
407             PgAuthRequest r( readBuffer() );
408 
409             switch ( r.type() ) {
410             case PgAuthRequest::Success:
411                 d->authenticated = true;
412                 break;
413 
414             case PgAuthRequest::Password:
415             case PgAuthRequest::Crypt:
416             case PgAuthRequest::MD5:
417                 {
418                     EString pass = password();
419 
420                     if ( d->setSessionAuthorisation ) {
421                         error( "Cannot supply credentials during proxy "
422                                "authentication" );
423                         return;
424                     }
425 
426                     if ( r.type() == PgAuthRequest::Crypt )
427                         pass = ::crypt( pass.cstr(), r.salt().cstr() );
428                     else if ( r.type() == PgAuthRequest::MD5 )
429                         pass = "md5" + MD5::hash(
430                                            MD5::hash(
431                                                pass + d->user
432                                            ).hex() + r.salt()
433                                        ).hex();
434 
435                     PgPasswordMessage p( pass );
436                     p.enqueue( writeBuffer() );
437                 }
438                 break;
439 
440             default:
441                 error( "Unsupported PgAuthRequest." );
442                 break;
443             }
444         }
445         break;
446 
447     default:
448         d->unknownMessage = true;
449         break;
450     }
451 }
452 
453 
454 /*! This function negotiates the backend startup phase of the protocol
455     (storing any messages the server sends us), concluding the startup
456     process when the server indicates that it is ready for queries. It
457     is called by react() with the \a type of the message to process.
458 */
459 
backendStartup(char type)460 void Postgres::backendStartup( char type )
461 {
462     switch ( type ) {
463     case 'Z':
464         setTimeout( 0 );
465         d->startup = false;
466         if ( CitextLookup::necessary() )
467             processQuery( (new CitextLookup())->q );
468         addHandle( this );
469 
470         // This successfully concludes connection startup. We'll leave
471         // this message unparsed, so that process() can handle it like
472         // any other PgReady.
473 
474         if ( d->setSessionAuthorisation )
475             processQuery( new Query( "SET SESSION AUTHORIZATION " +
476                                      Database::user(), 0 ) );
477 
478         break;
479 
480     case 'K':
481         d->keydata = new PgKeyData( readBuffer() );
482         log( "Postgres backend " + fn( connectionNumber() ) +
483              " has pid " + fn( d->keydata->pid() ), Log::Debug );
484         d->backendPid = d->keydata->pid();
485         break;
486 
487     default:
488         d->unknownMessage = true;
489         break;
490     }
491 }
492 
493 
494 /*! This function handles interaction with the server once the startup
495     phase is complete. It is called by react() with the \a type of the
496     message to process.
497 */
498 
process(char type)499 void Postgres::process( char type )
500 {
501     Query * q = d->queries.firstElement();
502     Scope x;
503     if ( q && q->log() )
504         x.setLog( q->log() );
505 
506     extendTimeout( 5 );
507 
508     switch ( type ) {
509     case '1':
510         {
511             PgParseComplete msg( readBuffer() );
512             if ( q && q->name() != "" )
513                 d->preparesPending.shift();
514         }
515         break;
516 
517     case '2':
518         {
519             PgBindComplete msg( readBuffer() );
520         }
521         break;
522 
523     case 'n':
524         {
525             PgNoData msg( readBuffer() );
526         }
527         break;
528 
529     case 't':
530         (void)new PgParameterDescription( readBuffer() );
531         break;
532 
533     case 'G':
534         {
535             PgCopyInResponse msg( readBuffer() );
536             if ( q && q->inputLines() ) {
537                 log( "Sending " + fn( q->inputLines()->count() ) +
538                      " data rows",
539                      Log::Debug );
540                 PgCopyData cd( q );
541                 PgCopyDone e;
542 
543                 cd.enqueue( writeBuffer() );
544                 e.enqueue( writeBuffer() );
545             }
546             else {
547                 PgCopyFail f;
548                 f.enqueue( writeBuffer() );
549             }
550 
551             PgSync s;
552             s.enqueue( writeBuffer() );
553             d->sendingCopy = false;
554         }
555         break;
556 
557     case 'T':
558         d->description = new PgRowDescription( readBuffer() );
559         break;
560 
561     case 'D':
562         {
563             if ( !q || !d->description ) {
564                 error( "Unexpected data row" );
565                 return;
566             }
567 
568             PgDataRow msg( readBuffer(), d->description );
569             q->addRow( msg.row() );
570             if ( d->needNotify && d->needNotify != q )
571                 d->needNotify->notify();
572             d->needNotify = q;
573         }
574         break;
575 
576     case 'I':
577     case 'C':
578         {
579             PgCommandComplete * cc = 0;
580             if ( type == 'C' )
581                 cc = new PgCommandComplete( readBuffer() );
582             else
583                 PgEmptyQueryResponse msg( readBuffer() );
584 
585             if ( q ) {
586                 EString s;
587                 s.append( "Dequeueing query " );
588                 s.append( q->description() );
589                 s.append( " on backend " );
590                 s.appendNumber( connectionNumber() );
591                 EString command;
592                 if ( cc )
593                     command = cc->tag().section( " ", 1 );
594                 if ( cc && !q->rows() ) {
595                     uint an = 2;
596                     if ( command == "INSERT" )
597                         an = 3;
598                     q->setRows( cc->tag().section( " ", an ).number( 0 ) );
599                 }
600                 if ( q->rows() ||
601                      command == "SELECT" || command == "FETCH" ||
602                      command == "INSERT" || command == "UPDATE" ) {
603                     s.append( " (with " );
604                     s.appendNumber( q->rows() );
605                     s.append( " rows)" );
606                 }
607                 ::log( s, Log::Info );
608                 if ( !q->done() ) {
609                     q->setState( Query::Completed );
610                     countQueries( q );
611                 }
612                 d->queries.shift();
613                 q->notify();
614                 d->needNotify = 0;
615             }
616         }
617         break;
618 
619     case 'Z':
620         {
621             PgReady msg( readBuffer() );
622             setState( msg.state() );
623         }
624         break;
625 
626     case 'A':
627         {
628             PgNotificationResponse msg( readBuffer() );
629             EString s;
630             if ( !msg.source().isEmpty() )
631                 s = " (" + msg.source() + ")";
632             log( "Received notify " + msg.name().quoted() +
633                  " from server pid " + fn( msg.pid() ) + s, Log::Debug );
634             DatabaseSignal::notifyAll( msg.name() );
635         }
636         break;
637 
638     default:
639         d->unknownMessage = true;
640         break;
641     }
642 }
643 
644 
645 /*! This function handles unknown or unwanted messages that some other
646     function declined to process (by setting d->unknownMessage). It is
647     called by react() with the \a type of the unknown message.
648 */
649 
unknown(char type)650 void Postgres::unknown( char type )
651 {
652     switch ( type ) {
653     case 'S':
654         {
655             d->unknownMessage = false;
656             PgParameterStatus msg( readBuffer() );
657 
658             EString n = msg.name();
659             EString v = msg.value();
660             EString e;
661             bool known = true;
662             if ( n == "client_encoding" ) {
663                 if ( v != "UTF8" && v != "SQL_ASCII" )
664                     e = "Unexpected client encoding: ";
665             }
666             else if ( n == "DateStyle" ) {
667                 // we want ISO on the list somewhere
668                 if ( !v.containsWord( "ISO" ) )
669                     e = "DateStyle apparently does not support ISO: ";
670             }
671             else if ( n == "integer_datetimes" ) {
672                 // PG documentation says:
673                 //     "Use 64-bit integer storage for datetimes and
674                 //     intervals, rather than the default floating-point
675                 //     storage. This reduces the range of representable
676                 //     values but guarantees microsecond precision across
677                 //     the full range (see Section 8.5 for more
678                 //     information)."
679                 // We don't care about that. Email uses only seconds,
680                 // and only a fairly limited time range. Both on and
681                 // off are okay.
682             }
683             else if ( n == "is_superuser" ) {
684                 if ( v.simplified().lower() != "off" )
685                     e = "Connected as superuser: ";
686             }
687             else if ( n == "server_encoding" ) {
688                 if ( v != "UTF8" && v != "SQL_ASCII" )
689                     e = "Unexpected server encoding: ";
690             }
691             else if ( n == "server_version" ) {
692                 // We're forever doomed to parse version strings to
693                 // recompute the server_version_num that the server
694                 // knows but purposely doesn't send us because:
695                 //
696                 // "I think this is just a waste of network bandwidth.
697                 // No client-side code could safely depend on its being
698                 // available for many years yet, therefore they're going
699                 // to keep using server_version." (2015-01-09)
700                 bool ok = true;
701 
702                 serverVersion = 10000 * v.section( ".", 1 ).number( &ok );
703                 if (ok && version() < 100000)
704                     serverVersion += 100 * v.section( ".", 2 ).number( &ok );
705                 if ( !ok || version() < 90100 )
706                     e = "Archiveopteryx requires PostgreSQL 9.1 or higher: ";
707             }
708             else if ( n == "session_authorization" ) {
709                 // we could test that v is d->user, but I don't think
710                 // we care. besides it might sound an alarm about our
711                 // ident workarounds.
712             }
713             else if ( n == "standard_conforming_strings" ) {
714                 // hm... ?
715             }
716             else if ( n == "TimeZone" ) {
717                 // we don't care.
718             }
719             else {
720                 known = false;
721             }
722             if ( known && e.isEmpty() ) {
723                 // we're entirely silent about this. all is well.
724             }
725             else {
726                 EString s( "PostgreSQL server: " );
727                 if ( e.isEmpty() )
728                     s.append( "SET " );
729                 else
730                     s.append( e );
731                 s.append( n );
732                 s.append( "=" );
733                 s.append( v.quoted() );
734                 if ( e.isEmpty() )
735                     ::log( s, Log::Debug );
736                 else
737                     ::log( s );
738             }
739         }
740         break;
741 
742     case 'N':
743     case 'E':
744         d->unknownMessage = false;
745         serverMessage();
746         break;
747 
748     default:
749         {
750             EString err = "Unexpected message (";
751 
752             if ( type > 32 && type < 127 )
753                 err.append( type );
754             else
755                 err.append( "%" + fn( (int)type, 16 ) );
756 
757             err.append( ") received" );
758             if ( d->startup ) {
759                 if ( !d->authenticated )
760                     err.append( " during authentication" );
761                 else
762                     err.append( " during backend startup" );
763             }
764             err.append( "." );
765             error( err );
766         }
767         break;
768     }
769 }
770 
771 
772 /*! This function handles errors and other messages from the server.
773 
774     Uses the sqlstates specified
775     http://www.postgresql.org/docs/current/static/protocol.html
776     extensively.
777 */
778 
serverMessage()779 void Postgres::serverMessage()
780 {
781     Scope x;
782     EString s;
783     PgMessage msg( readBuffer() );
784     Query *q = d->queries.firstElement();
785     EString m( msg.message() );
786     EString code = msg.code();
787     Endpoint server( peer() );
788 
789     if ( code == "57P03" ) {
790         log( "Retrying connection after delay because PostgreSQL "
791              "is still starting up.", Log::Info );
792         close();
793         sleep( 1 );
794         connect( server );
795     }
796     else if ( code == "57P01" || code == "57P02" ) {
797         if ( code == "57P01" )
798             log( "PostgreSQL is shutting down; closing connection.", Log::Info );
799         else
800             log( "PostgreSQL reports a crash; closing connection.", Log::Info );
801         removeHandle( this );
802         if ( ::listener == this )
803             ::listener = 0;
804         close();
805         error( "PostgreSQL server shut down" );
806     }
807     else if ( code == "28000" && m.lower().containsWord( "ident" ) ) {
808         int b = m.find( '"' );
809         int e = m.find( '"', b+1 );
810         EString user( m.mid( b+1, e-b-1 ) );
811 
812         struct passwd * u = getpwnam( d->user.cstr() );
813 
814         struct passwd * p = 0;
815         const char * pg
816             = Configuration::compiledIn( Configuration::PgUser );
817 
818         if ( pg )
819             p = getpwnam( pg );
820         if ( !p )
821             p = getpwnam( "postgres" );
822         if ( !p )
823             p = getpwnam( "pgsql" );
824 
825         if ( !d->identBreakageSeen && loginAs() == DbOwner &&
826              u == 0 && p != 0 )
827         {
828             d->identBreakageSeen = true;
829             d->setSessionAuthorisation = true;
830             log( "Attempting to authenticate as superuser to use "
831                  "SET SESSION AUTHORIZATION", Log::Info );
832             d->user = EString( p->pw_name );
833             uid_t e = geteuid();
834             setreuid( 0, p->pw_uid );
835             close();
836             connect( server );
837             setreuid( 0, e );
838         }
839         else if ( s == Configuration::text(Configuration::JailUser) &&
840                   Configuration::toggle( Configuration::Security ) &&
841                   self().protocol() != Endpoint::Unix )
842         {
843             // If we connected via IPv4 or IPv6, early enough that
844             // postgres had a chance to reject us, we'll try again.
845             d->identBreakageSeen = true;
846             log( "PostgreSQL demanded IDENT, which did not match "
847                  "during startup. Retrying.", Log::Info );
848             close();
849             connect( server );
850         }
851         else {
852             log( "PostgreSQL refuses authentication because this "
853                  "process is not running as user " + user.quoted() +
854                  ". See http://aox.org/faq/mailstore#ident",
855                  Log::Disaster );
856         }
857     }
858     else if ( code == "28000" ) {
859         log( "Cannot authenticate as PostgreSQL user " + d->user.quoted() +
860              ". Server message: " + msg.message(), Log::Disaster );
861     }
862     else if ( code.startsWith( "53" ) ) {
863         uint m = Configuration::scalar( Configuration::DbMaxHandles );
864         if ( code == "53000" )
865             log( "PostgreSQL server reports too many client connections. "
866                  "Our connection count is " + fn( numHandles() ) + ", "
867                  "configured maximum is " + fn( m ) + ".",
868                  Log::Error );
869         else
870             log( "PostgreSQL server has a resource problem (" + code + "): " +
871                  msg.message(),
872                  Log::Significant );
873         if ( m > 2 ) {
874             log( "Setting db-max-handles to 2 (was " + fn( m ) + ")" );
875             Configuration::add( "db-max-handles = 2" );
876         }
877 
878     }
879     else if ( msg.type() == PgMessage::Notification ) {
880         s.append( "PostgreSQL server: " );
881         if ( q ) {
882             s.append( "Query " + q->description() + ": " );
883             x.setLog( q->log() );
884         }
885         s.append( m );
886         if ( !code.startsWith( "00" ) )
887             s.append( " (warning)" );
888         ::log( s, Log::Debug );
889     }
890     else if ( q && !code.startsWith( "00" ) ) {
891         s.append( "PostgreSQL server: " );
892         s.append( "Query " + q->description() + " failed: " );
893         x.setLog( q->log() );
894         s.append( m );
895         if ( !msg.detail().isEmpty() )
896             s.append( " (" + msg.detail() + ")" );
897         s.append( " (" + code + ")" );
898 
899         // If we sent a Parse message for a named prepared statement
900         // while processing this query, but don't already know that
901         // it succeeded, we'll assume that statement name does not
902         // exist for future use.
903         EString * pp = d->preparesPending.first();
904         if ( q->name() != "" && pp && *pp == q->name() ) {
905             d->prepared.remove( q->name() );
906             d->preparesPending.shift();
907         }
908         if ( q->inputLines() )
909             d->sendingCopy = false;
910         d->queries.shift();
911         m = mapped( m );
912         if ( !msg.detail().isEmpty() )
913             s.append( " (" + msg.detail() + ")" );
914         q->setError( m );
915         q->notify();
916     }
917     else {
918         ::log( "PostgreSQL server message could not be interpreted."
919                " Message: " + msg.message() +
920                " SQL state code: " + code +
921                " Severity: " + msg.severity().lower(),
922                Log::Error );
923     }
924 
925     if ( code.startsWith( "08" ) ) // connection exception
926         error( "PostgreSQL server error: " + s );
927 }
928 
929 
930 // these errors are based on a selection of the results from
931 // select indexname from pg_indexes where tablename in
932 //  (select tablename from pg_tables where tableowner='aoxsuper')
933 
934 static const struct {
935     const char * constraint;
936     const char * human;
937 } errormap[] = {
938     // some index names
939     {"addresses_nld_key",
940      "Operation would create two identical addresses" },
941     {"u_l",
942      "Operation wold create two users with identical login names"},
943     // some constraints from our postgresql schema
944     {"aliases_address_fkey", // contype f
945      "Operation would create two aliases with the same address"},
946     {"aliases_address_key", // contype u
947      "Operation would create two aliases with the same address"},
948     {"annotation_names_name_key", // contype u
949      "Operation would create two annotation_names rows with the same_name"},
950     {"annotations_mailbox_key", // contype u
951      "Operation would create a duplicate annotations row"},
952     {"annotations_mailbox_key1", // contype u
953      "Operation would create a duplicate annotations row"},
954     // XXX where does the annotations unique condition end up?
955     {"deliveries_message_key", // contype u
956      "Operation would store the same message for delivery twice"},
957     {"field_names_name_key", // contype u
958      "Operation would create two header field names with the same name"},
959     {"fn_uname",
960      "Operation would store two identical flag names separately"},
961     {"group_members_groupname_fkey", // contype f
962      "Operation would create group_members row with invalid groupname"},
963     {"group_members_member_fkey", // contype f
964      "Operation would create group_members row with invalid member"},
965     {"group_members_pkey", // contype p
966      "Operation would create duplicate group_members row"},
967     // XXX shouldb't groups.name be unique? and different from all users.name?
968     {"mailboxes_name_key", // contype u
969      "Operation would create two mailboxes with the same name"},
970     {"mailboxes_owner_fkey", // contype f
971      "Operation would create a mailbox without an owner"},
972     {"messages_id_key", // contype u
973      "Opeation would create two messages objects with the same ID"},
974     {"namespaces_name_key", // contype u
975      "Operation would create two user namespaces with the same name"},
976     {"permissions_mailbox_fkey", // contype f
977      "Operation would create a permissions row without a mailbox"},
978     {"permissions_pkey", // contype p
979      "Operation would create a duplicate permissions row"},
980     {"scripts_owner_key", // contype u
981      "Operation would store two scripts with the same owner and name"},
982     // XXX shouldn't users.alias be unique?
983     {"users_alias_fkey", // contype f
984      "users_alias"},
985     {"users_parentspace_fkey", // contype f
986      "Operation would create a users row without a namespace"},
987     {0,0}
988 };
989 
990 
991 
992 /*! Looks for constraint names in \a s and returns an error message
993     corresponding to the relevant constraint. Returns \a s if it finds
994     none.
995 */
996 
mapped(const EString & s) const997 EString Postgres::mapped( const EString & s ) const
998 {
999     if ( !s.contains( "_" ) )
1000         return "PostgreSQL Server: " + s;
1001 
1002     EString h;
1003     uint maps = 0;
1004     EString w;
1005     uint i = 0;
1006     while ( maps < 2 && i <= s.length() ) {
1007         char c = s[i];
1008         if ( ( c >= 'a' && c <= 'z' ) ||
1009              ( c >= 'A' && c <= 'Z' ) ||
1010              ( c >= '0' && c <= '9' ) ||
1011              ( c == '_' ) ) {
1012             w.append( c );
1013         }
1014         else if ( !w.isEmpty() ) {
1015             uint j = 0;
1016             while ( errormap[j].constraint && w != errormap[j].constraint )
1017                 j++;
1018             if ( errormap[j].constraint ) {
1019                 maps++;
1020                 h = errormap[j].human;
1021                 h.append( " (" );
1022                 h.append( w );
1023                 h.append( ")" );
1024             }
1025             w.truncate();
1026         }
1027         i++;
1028     }
1029     if ( maps != 1 )
1030         return "PostgreSQL Server: " + s;
1031 
1032     return h;
1033 }
1034 
1035 
1036 
1037 /*! Handles all protocol/socket errors by logging the error message \a s
1038     and closing the connection after emptying the write buffer and
1039     notifying any pending queries of the failure.
1040 
1041 */
1042 
error(const EString & s)1043 void Postgres::error( const EString &s )
1044 {
1045     if ( ::listener == this )
1046         ::listener = 0;
1047 
1048     Scope x( log() );
1049     ::log( s + " (on backend " + fn( connectionNumber() ) + ")", Log::Error );
1050 
1051     d->error = true;
1052     d->active = false;
1053     setState( Broken );
1054 
1055     List< Query >::Iterator q( d->queries );
1056     while ( q ) {
1057         q->setError( s );
1058         q->notify();
1059         ++q;
1060     }
1061 
1062     removeHandle( this );
1063 
1064     writeBuffer()->remove( writeBuffer()->size() );
1065     Connection::setState( Closing );
1066 }
1067 
1068 
1069 /*! Sends a termination message and takes this database handle out of
1070     circulation gracefully.
1071 */
1072 
shutdown()1073 void Postgres::shutdown()
1074 {
1075     if ( ::listener == this )
1076         ::listener = 0;
1077 
1078     PgTerminate msg;
1079     msg.enqueue( writeBuffer() );
1080 
1081     if ( d->transaction ) {
1082         d->transaction->setError( 0, "Database connection shutdown" );
1083         d->transaction->notify();
1084     }
1085     List< Query >::Iterator q( d->queries );
1086     while ( q ) {
1087         if ( !q->done() ) {
1088             q->setError( "Database connection shutdown" );
1089             q->notify();
1090         }
1091         ++q;
1092     }
1093 
1094     removeHandle( this );
1095     d->active = false;
1096 }
1097 
1098 
hasMessage(Buffer * b)1099 static bool hasMessage( Buffer *b )
1100 {
1101     if ( b->size() < 5 ||
1102          b->size() < 1+( (uint)((*b)[1]<<24)|((*b)[2]<<16)|
1103                                ((*b)[3]<<8)|((*b)[4]) ) )
1104         return false;
1105     return true;
1106 }
1107 
1108 
1109 /*! Returns true if this handle is willing to process new queries: i.e.
1110     if it has an active and error-free connection to the server, and no
1111     outstanding queries; and false otherwise.
1112 */
1113 
usable() const1114 bool Postgres::usable() const
1115 {
1116     return ( d->active && !d->startup &&
1117              !( state() == Connecting || state() == Broken ) &&
1118              d->queries.isEmpty() );
1119 }
1120 
1121 
1122 static GraphableCounter * goodQueries = 0;
1123 static GraphableCounter * badQueries = 0;
1124 
1125 
1126 /*! Updates the statistics when \a q is done. */
1127 
countQueries(class Query * q)1128 void Postgres::countQueries( class Query * q )
1129 {
1130     if ( !goodQueries ) {
1131         goodQueries = new GraphableCounter( "queries-executed" ); // bad name?
1132         badQueries = new GraphableCounter( "queries-failed" ); // bad name?
1133     }
1134 
1135     if ( !q->failed() )
1136         goodQueries->tick();
1137     else if ( !q->canFail() )
1138         badQueries->tick();
1139     ; // a query which fails but canFail is not counted anywhere.
1140 
1141     // later also use GraphableDataSet to keep track of query
1142     // execution times, but not right now.
1143 }
1144 
1145 
1146 /*! Returns the Postgres server's declared version number as an
1147     integer. 8.1.0 is returned as 80100, 8.3.2 as 80302.
1148 
1149     The version number is learned immediately after connecting.
1150     version() returns 0 until the first Postgres instance learns the
1151     server version.
1152 */
1153 
version()1154 uint Postgres::version()
1155 {
1156     return ::serverVersion;
1157 }
1158 
1159 
1160 /*! Makes sure Postgres sends as many LISTEN commands as necessary,
1161     see DatabaseSignal and
1162     http://www.postgresql.org/docs/8.1/static/sql-listen.html
1163 
1164 */
1165 
sendListen()1166 void Postgres::sendListen()
1167 {
1168     EStringList::Iterator s( DatabaseSignal::names() );
1169     while ( s ) {
1170         EString name = *s;
1171         ++s;
1172         if ( !d->listening.contains( name ) ) {
1173             d->listening.append( name );
1174             if ( !name.boring() )
1175                 name = name.quoted();
1176             processQuery( new Query( "listen " + name, 0 ) );
1177         }
1178     }
1179 }
1180 
1181 
1182 /*! Returns the query string for \a q, after possibly applying
1183     version-specific hacks and workarounds. */
1184 
queryString(Query * q)1185 EString Postgres::queryString( Query * q )
1186 {
1187     if ( !q->name().isEmpty() )
1188         return q->string();
1189 
1190     EString s( q->string() );
1191 
1192     if ( s != q->string() ) {
1193         Scope x( q->log() );
1194         log( "Changing query string to: " + s, Log::Debug );
1195     }
1196 
1197     return s;
1198 }
1199 
1200 
1201 class PgCanceller
1202     : public Postgres
1203 {
1204 private:
1205     PgKeyData * k;
1206 
1207 public:
PgCanceller(PgKeyData * key)1208     PgCanceller( PgKeyData * key )
1209         : Postgres(), k( key )
1210     {
1211         log( "Sending cancel for pid " + fn( k->pid() ), Log::Debug );
1212     }
1213 
react(Event e)1214     void react( Event e )
1215     {
1216         switch (e) {
1217         case Connect:
1218             {
1219                 PgCancel msg( k );
1220                 msg.enqueue( writeBuffer() );
1221                 Connection::setState( Closing );
1222             }
1223             break;
1224 
1225         default:
1226             break;
1227         }
1228     }
1229 };
1230 
1231 
1232 /*! Issues a cancel request for the query \a q if it is being executed
1233     by this Postgres object. If not, it does nothing.
1234 */
1235 
cancel(Query * q)1236 void Postgres::cancel( Query * q )
1237 {
1238     if ( d->queries.find( q ) )
1239         (void)new PgCanceller( d->keydata );
1240 }
1241