1 // Copyright 2009 The Archiveopteryx Developers <info@aox.org>
2
3 #include "postgres.h"
4
5 #include "dict.h"
6 #include "list.h"
7 #include "estring.h"
8 #include "buffer.h"
9 #include "dbsignal.h"
10 #include "allocator.h"
11 #include "configuration.h"
12 #include "transaction.h"
13 #include "estringlist.h"
14 #include "pgmessage.h"
15 #include "eventloop.h"
16 #include "graph.h"
17 #include "query.h"
18 #include "event.h"
19 #include "scope.h"
20 #include "md5.h"
21 #include "log.h"
22
23 // crypt(), setreuid(), getpwnam()
24 #define _XOPEN_SOURCE 600
25 #include <sys/types.h>
26 #include <unistd.h>
27 #include <pwd.h>
28
29
30 static bool hasMessage( Buffer * );
31 static uint serverVersion;
32 static Postgres * listener = 0;
33
34
35 class PgData
36 : public Garbage
37 {
38 public:
PgData()39 PgData()
40 : active( false ), startup( false ), authenticated( false ),
41 unknownMessage( false ), identBreakageSeen( false ),
42 setSessionAuthorisation( false ),
43 sendingCopy( false ), error( false ),
44 keydata( 0 ),
45 description( 0 ), transaction( 0 ),
46 needNotify( 0 ), backendPid( 0 )
47 {}
48
49 bool active;
50 bool startup;
51 bool authenticated;
52 bool unknownMessage;
53 bool identBreakageSeen;
54 bool setSessionAuthorisation;
55 bool sendingCopy;
56 bool error;
57 EStringList listening;
58
59 PgKeyData *keydata;
60 PgRowDescription *description;
61 Dict<Postgres> prepared;
62 EStringList preparesPending;
63
64 List< Query > queries;
65 Transaction *transaction;
66 Query * needNotify;
67
68 EString user;
69
70 uint backendPid;
71
72 class LockSpotter
73 : public EventHandler {
74 public:
LockSpotter(uint p,Transaction * t)75 LockSpotter( uint p, Transaction * t ): EventHandler(), q( 0 ) {
76 setLog( new Log( t->owner()->log() ) );
77 Scope x( log() );
78 EString s(
79 "select h.pid::int, a.xact_start::text,"
80 " coalesce(a.client_addr::text,''::text) as client_addr, "
81 " a.current_query::text, "
82 " a.usename::text, "
83 " a.current_query,"
84 " w.locktype::text "
85 "from pg_locks h join pg_locks w using (locktype) "
86 "join pg_stat_activity a on (h.pid="
87 );
88 if (Postgres::version() < 90200)
89 s.append( "a.procpid" );
90 else
91 s.append( "a.pid" );
92 s.append(
93 ") "
94 "where h.granted and not w.granted and w.pid=$1 and "
95 "coalesce(h.relation::text, h.page::text, h.tuple::text, "
96 " h.transactionid::text, h.virtualxid)="
97 "coalesce(w.relation::text, w.page::text, w.tuple::text, "
98 " w.transactionid::text, w.virtualxid)"
99 );
100 q = new Query( s, this );
101 q->bind( 1, p );
102 q->execute();
103 }
execute()104 void execute() {
105 while ( q->hasResults() ) {
106 Row * r = q->nextRow();
107 log( "Transaction is waiting for a lock of type " +
108 r->getEString( "locktype" ).quoted() + ". "
109 "The following points to the lock's current holder. "
110 "PID: " + fn( r->getInt( "pid" ) ) + " "
111 "Transaction start time: " + r->getEString( "xact_start" ) + " "
112 "Username: " + r->getEString( "usename" ) + " "
113 "Client address: " + r->getEString( "client_addr" ) + " "
114 "Current query: " + r->getEString( "current_query" ).quoted(),
115 Log::Significant );
116 }
117 }
118 Query * q;
119 };
120 };
121
122
123 /*! \class Postgres postgres.h
124 Implements the PostgreSQL 3.0 Frontend-Backend protocol.
125
126 This is our interface to PostgreSQL. As a subclass of Database, it
127 accepts Query objects, sends queries to the database, and notifies
128 callers about any resulting data. As a descendant of Connection, it
129 is responsible for all network communications with the server.
130
131 The network protocol is documented at <doc/src/sgml/protocol.sgml>
132 and <http://www.postgresql.org/docs/current/static/protocol.html>.
133 The version implemented here is used by PostgreSQL 7.4 and later.
134
135 At the time of writing, there do not seem to be any other suitable
136 PostgreSQL client libraries available. For example, libpqxx doesn't
137 support asynchronous operation or prepared statements. Its interface
138 would be difficult to wrap in a database-agnostic manner, and it
139 depends on the untested libpq. The others aren't much better.
140 */
141
142 /*! Creates a Postgres object, initiates a TCP connection to the server,
143 registers with the main loop, and adds this Database to the list of
144 available handles.
145 */
146
Postgres()147 Postgres::Postgres()
148 : Database(), d( new PgData )
149 {
150 d->user = Database::user();
151 struct passwd * p = getpwnam( d->user.cstr() );
152 if ( p && getuid() != p->pw_uid ) {
153 // Try to cooperate with ident authentication.
154 uid_t e = geteuid();
155 setreuid( 0, p->pw_uid );
156 connect( address(), port() );
157 setreuid( 0, e );
158 }
159 else {
160 connect( address(), port() );
161 }
162
163 log( "Connecting to PostgreSQL server at " +
164 address() + ":" + fn( port() ) + " "
165 "(backend " + fn( connectionNumber() ) + ", fd " + fn( fd() ) +
166 ", user " + d->user + ")", Log::Debug );
167
168 if ( Connection::state() != Invalid ) {
169 setTimeoutAfter( 10 );
170 EventLoop::global()->addConnection( this );
171 }
172 }
173
174
~Postgres()175 Postgres::~Postgres()
176 {
177 EventLoop::global()->removeConnection( this );
178 }
179
180
processQueue()181 void Postgres::processQueue()
182 {
183 if ( !d->queries.isEmpty() )
184 return;
185
186 if ( d->sendingCopy )
187 return;
188
189 if ( d->transaction &&
190 ( d->transaction->state() == Transaction::Completed ||
191 d->transaction->state() == Transaction::RolledBack ) )
192 d->transaction = 0;
193
194 if ( !::listener && !d->transaction )
195 ::listener = this;
196 if ( ::listener == this )
197 sendListen();
198
199 List< Query > * l = 0;
200 if ( d->transaction ) {
201 l = d->transaction->submittedQueries();
202 }
203 else {
204 if ( listener == this && numHandles() > 1 )
205 l = Database::firstSubmittedQuery( false );
206 else
207 l = Database::firstSubmittedQuery( true );
208
209 if ( l->firstElement() && l->firstElement()->transaction() ) {
210 Transaction * t = l->firstElement()->transaction();
211 d->transaction = t;
212 t->setDatabase( this );
213 }
214 }
215
216 Query * q = l->shift();
217 while ( q ) {
218 q->setState( Query::Executing );
219 if ( !d->error ) {
220 processQuery( q );
221 }
222 else {
223 q->setError( "Database handle no longer usable." );
224 q->notify();
225 }
226 q = l->shift();
227 }
228
229 if ( d->queries.isEmpty() )
230 reactToIdleness();
231 }
232
233
234 /*! Sends whatever messages are required to make the backend process the
235 query \a q.
236 */
237
processQuery(Query * q)238 void Postgres::processQuery( Query * q )
239 {
240 Scope x( q->log() );
241 d->queries.append( q );
242 EString s( "Sent " );
243 if ( q->name() == "" ||
244 !d->prepared.contains( q->name() ) )
245 {
246 PgParse a( queryString( q ), q->name() );
247 a.enqueue( writeBuffer() );
248
249 if ( q->name() != "" ) {
250 d->prepared.insert( q->name(), this );
251 d->preparesPending.append( q->name() );
252 }
253
254 s.append( "parse/" );
255 }
256
257 PgBind b( q->name() );
258 b.bind( q->values() );
259 b.enqueue( writeBuffer() );
260
261 PgDescribe c;
262 c.enqueue( writeBuffer() );
263
264 PgExecute ex;
265 ex.enqueue( writeBuffer() );
266
267 PgSync e;
268 e.enqueue( writeBuffer() );
269
270 s.append( "execute for " );
271 s.append( q->description() );
272 s.append( " on backend " );
273 s.appendNumber( connectionNumber() );
274 ::log( s, Log::Debug );
275 recordExecution();
276 }
277
278
react(Event e)279 void Postgres::react( Event e )
280 {
281 switch ( e ) {
282 case Connect:
283 {
284 PgStartup msg;
285 msg.setOption( "user", d->user );
286 msg.setOption( "database", name() );
287 msg.setOption( "search_path",
288 Configuration::text( Configuration::DbSchema ) );
289 msg.enqueue( writeBuffer() );
290
291 d->active = true;
292 d->startup = true;
293 }
294 break;
295
296 case Read:
297 while ( d->active && hasMessage( readBuffer() ) ) {
298 /* We call a function to process every message we receive.
299 This function is expected to parse and remove a message
300 from the readBuffer, throwing an exception for malformed
301 messages, and setting d->unknownMessage for messages that
302 it can't or won't handle. */
303
304 char msg = (*readBuffer())[0];
305 try {
306 if ( d->startup ) {
307 if ( !d->authenticated )
308 authentication( msg );
309 else
310 backendStartup( msg );
311 }
312 else {
313 process( msg );
314 }
315
316 if ( d->unknownMessage )
317 unknown( msg );
318 }
319 catch ( PgServerMessage::Error e ) {
320 error( "Malformed " + EString( &msg, 1 ).quoted() +
321 " message received." );
322 }
323 }
324 if ( d->needNotify )
325 d->needNotify->notify();
326 d->needNotify = 0;
327
328 break;
329
330 case Error:
331 error( "Couldn't connect to PostgreSQL." );
332 break;
333
334 case Close:
335 if ( d->active )
336 error( "Connection terminated by the server." );
337 break;
338
339 case Timeout:
340 if ( d->transaction &&
341 ( d->transaction->state() == Transaction::Completed ||
342 d->transaction->state() == Transaction::RolledBack ) )
343 d->transaction = 0;
344
345 if ( !d->active || d->startup ) {
346 error( "Timeout negotiating connection to PostgreSQL." );
347 }
348 else if ( d->transaction ) {
349 if ( d->queries.isEmpty() )
350 d->transaction->rollback();
351 else if ( d->backendPid )
352 new PgData::LockSpotter( d->backendPid, d->transaction );
353 else
354 log( "Transaction unexpectedly slow; continuing " );
355 }
356 else if ( d->queries.isEmpty() &&
357 ::listener != this &&
358 server().protocol() != Endpoint::Unix &&
359 handlesNeeded() < numHandles() ) {
360 log( "Closing idle database backend " + fn( connectionNumber() ) +
361 " (" + fn( numHandles()-1 ) + " remaining)" );
362 shutdown();
363 }
364 break;
365
366 case Shutdown:
367 shutdown();
368 break;
369 }
370
371 if ( usable() ) {
372 processQueue();
373 if ( d->queries.isEmpty() && !d->transaction ) {
374 uint interval =
375 Configuration::scalar( Configuration::DbHandleInterval );
376 if ( ::listener == this )
377 interval = interval * 2;
378 else if ( idleHandles() > 2 && interval > 20 )
379 interval = 20;
380 setTimeoutAfter( interval );
381 }
382 }
383 if ( d->transaction && d->queries.isEmpty() ) {
384 // if the transaction doesn't do anything, just sits there
385 // holding its handle, we have to rollback() in order to free
386 // the handle for better work. we do it more quickly for a
387 // broken transaction than for one that seems fine.
388 if ( d->transaction->state() == Transaction::Failed )
389 setTimeoutAfter( 5 );
390 else
391 setTimeoutAfter( 20 );
392 }
393 }
394
395
396 /*! This function handles the authentication phase of the protocol. It
397 expects and responds to an authentication request, and waits for a
398 positive response before entering the backend startup phase. It is
399 called by react with the \a type of the message to process.
400 */
401
authentication(char type)402 void Postgres::authentication( char type )
403 {
404 switch ( type ) {
405 case 'R':
406 {
407 PgAuthRequest r( readBuffer() );
408
409 switch ( r.type() ) {
410 case PgAuthRequest::Success:
411 d->authenticated = true;
412 break;
413
414 case PgAuthRequest::Password:
415 case PgAuthRequest::Crypt:
416 case PgAuthRequest::MD5:
417 {
418 EString pass = password();
419
420 if ( d->setSessionAuthorisation ) {
421 error( "Cannot supply credentials during proxy "
422 "authentication" );
423 return;
424 }
425
426 if ( r.type() == PgAuthRequest::Crypt )
427 pass = ::crypt( pass.cstr(), r.salt().cstr() );
428 else if ( r.type() == PgAuthRequest::MD5 )
429 pass = "md5" + MD5::hash(
430 MD5::hash(
431 pass + d->user
432 ).hex() + r.salt()
433 ).hex();
434
435 PgPasswordMessage p( pass );
436 p.enqueue( writeBuffer() );
437 }
438 break;
439
440 default:
441 error( "Unsupported PgAuthRequest." );
442 break;
443 }
444 }
445 break;
446
447 default:
448 d->unknownMessage = true;
449 break;
450 }
451 }
452
453
454 /*! This function negotiates the backend startup phase of the protocol
455 (storing any messages the server sends us), concluding the startup
456 process when the server indicates that it is ready for queries. It
457 is called by react() with the \a type of the message to process.
458 */
459
backendStartup(char type)460 void Postgres::backendStartup( char type )
461 {
462 switch ( type ) {
463 case 'Z':
464 setTimeout( 0 );
465 d->startup = false;
466 if ( CitextLookup::necessary() )
467 processQuery( (new CitextLookup())->q );
468 addHandle( this );
469
470 // This successfully concludes connection startup. We'll leave
471 // this message unparsed, so that process() can handle it like
472 // any other PgReady.
473
474 if ( d->setSessionAuthorisation )
475 processQuery( new Query( "SET SESSION AUTHORIZATION " +
476 Database::user(), 0 ) );
477
478 break;
479
480 case 'K':
481 d->keydata = new PgKeyData( readBuffer() );
482 log( "Postgres backend " + fn( connectionNumber() ) +
483 " has pid " + fn( d->keydata->pid() ), Log::Debug );
484 d->backendPid = d->keydata->pid();
485 break;
486
487 default:
488 d->unknownMessage = true;
489 break;
490 }
491 }
492
493
494 /*! This function handles interaction with the server once the startup
495 phase is complete. It is called by react() with the \a type of the
496 message to process.
497 */
498
process(char type)499 void Postgres::process( char type )
500 {
501 Query * q = d->queries.firstElement();
502 Scope x;
503 if ( q && q->log() )
504 x.setLog( q->log() );
505
506 extendTimeout( 5 );
507
508 switch ( type ) {
509 case '1':
510 {
511 PgParseComplete msg( readBuffer() );
512 if ( q && q->name() != "" )
513 d->preparesPending.shift();
514 }
515 break;
516
517 case '2':
518 {
519 PgBindComplete msg( readBuffer() );
520 }
521 break;
522
523 case 'n':
524 {
525 PgNoData msg( readBuffer() );
526 }
527 break;
528
529 case 't':
530 (void)new PgParameterDescription( readBuffer() );
531 break;
532
533 case 'G':
534 {
535 PgCopyInResponse msg( readBuffer() );
536 if ( q && q->inputLines() ) {
537 log( "Sending " + fn( q->inputLines()->count() ) +
538 " data rows",
539 Log::Debug );
540 PgCopyData cd( q );
541 PgCopyDone e;
542
543 cd.enqueue( writeBuffer() );
544 e.enqueue( writeBuffer() );
545 }
546 else {
547 PgCopyFail f;
548 f.enqueue( writeBuffer() );
549 }
550
551 PgSync s;
552 s.enqueue( writeBuffer() );
553 d->sendingCopy = false;
554 }
555 break;
556
557 case 'T':
558 d->description = new PgRowDescription( readBuffer() );
559 break;
560
561 case 'D':
562 {
563 if ( !q || !d->description ) {
564 error( "Unexpected data row" );
565 return;
566 }
567
568 PgDataRow msg( readBuffer(), d->description );
569 q->addRow( msg.row() );
570 if ( d->needNotify && d->needNotify != q )
571 d->needNotify->notify();
572 d->needNotify = q;
573 }
574 break;
575
576 case 'I':
577 case 'C':
578 {
579 PgCommandComplete * cc = 0;
580 if ( type == 'C' )
581 cc = new PgCommandComplete( readBuffer() );
582 else
583 PgEmptyQueryResponse msg( readBuffer() );
584
585 if ( q ) {
586 EString s;
587 s.append( "Dequeueing query " );
588 s.append( q->description() );
589 s.append( " on backend " );
590 s.appendNumber( connectionNumber() );
591 EString command;
592 if ( cc )
593 command = cc->tag().section( " ", 1 );
594 if ( cc && !q->rows() ) {
595 uint an = 2;
596 if ( command == "INSERT" )
597 an = 3;
598 q->setRows( cc->tag().section( " ", an ).number( 0 ) );
599 }
600 if ( q->rows() ||
601 command == "SELECT" || command == "FETCH" ||
602 command == "INSERT" || command == "UPDATE" ) {
603 s.append( " (with " );
604 s.appendNumber( q->rows() );
605 s.append( " rows)" );
606 }
607 ::log( s, Log::Info );
608 if ( !q->done() ) {
609 q->setState( Query::Completed );
610 countQueries( q );
611 }
612 d->queries.shift();
613 q->notify();
614 d->needNotify = 0;
615 }
616 }
617 break;
618
619 case 'Z':
620 {
621 PgReady msg( readBuffer() );
622 setState( msg.state() );
623 }
624 break;
625
626 case 'A':
627 {
628 PgNotificationResponse msg( readBuffer() );
629 EString s;
630 if ( !msg.source().isEmpty() )
631 s = " (" + msg.source() + ")";
632 log( "Received notify " + msg.name().quoted() +
633 " from server pid " + fn( msg.pid() ) + s, Log::Debug );
634 DatabaseSignal::notifyAll( msg.name() );
635 }
636 break;
637
638 default:
639 d->unknownMessage = true;
640 break;
641 }
642 }
643
644
645 /*! This function handles unknown or unwanted messages that some other
646 function declined to process (by setting d->unknownMessage). It is
647 called by react() with the \a type of the unknown message.
648 */
649
unknown(char type)650 void Postgres::unknown( char type )
651 {
652 switch ( type ) {
653 case 'S':
654 {
655 d->unknownMessage = false;
656 PgParameterStatus msg( readBuffer() );
657
658 EString n = msg.name();
659 EString v = msg.value();
660 EString e;
661 bool known = true;
662 if ( n == "client_encoding" ) {
663 if ( v != "UTF8" && v != "SQL_ASCII" )
664 e = "Unexpected client encoding: ";
665 }
666 else if ( n == "DateStyle" ) {
667 // we want ISO on the list somewhere
668 if ( !v.containsWord( "ISO" ) )
669 e = "DateStyle apparently does not support ISO: ";
670 }
671 else if ( n == "integer_datetimes" ) {
672 // PG documentation says:
673 // "Use 64-bit integer storage for datetimes and
674 // intervals, rather than the default floating-point
675 // storage. This reduces the range of representable
676 // values but guarantees microsecond precision across
677 // the full range (see Section 8.5 for more
678 // information)."
679 // We don't care about that. Email uses only seconds,
680 // and only a fairly limited time range. Both on and
681 // off are okay.
682 }
683 else if ( n == "is_superuser" ) {
684 if ( v.simplified().lower() != "off" )
685 e = "Connected as superuser: ";
686 }
687 else if ( n == "server_encoding" ) {
688 if ( v != "UTF8" && v != "SQL_ASCII" )
689 e = "Unexpected server encoding: ";
690 }
691 else if ( n == "server_version" ) {
692 // We're forever doomed to parse version strings to
693 // recompute the server_version_num that the server
694 // knows but purposely doesn't send us because:
695 //
696 // "I think this is just a waste of network bandwidth.
697 // No client-side code could safely depend on its being
698 // available for many years yet, therefore they're going
699 // to keep using server_version." (2015-01-09)
700 bool ok = true;
701
702 serverVersion = 10000 * v.section( ".", 1 ).number( &ok );
703 if (ok && version() < 100000)
704 serverVersion += 100 * v.section( ".", 2 ).number( &ok );
705 if ( !ok || version() < 90100 )
706 e = "Archiveopteryx requires PostgreSQL 9.1 or higher: ";
707 }
708 else if ( n == "session_authorization" ) {
709 // we could test that v is d->user, but I don't think
710 // we care. besides it might sound an alarm about our
711 // ident workarounds.
712 }
713 else if ( n == "standard_conforming_strings" ) {
714 // hm... ?
715 }
716 else if ( n == "TimeZone" ) {
717 // we don't care.
718 }
719 else {
720 known = false;
721 }
722 if ( known && e.isEmpty() ) {
723 // we're entirely silent about this. all is well.
724 }
725 else {
726 EString s( "PostgreSQL server: " );
727 if ( e.isEmpty() )
728 s.append( "SET " );
729 else
730 s.append( e );
731 s.append( n );
732 s.append( "=" );
733 s.append( v.quoted() );
734 if ( e.isEmpty() )
735 ::log( s, Log::Debug );
736 else
737 ::log( s );
738 }
739 }
740 break;
741
742 case 'N':
743 case 'E':
744 d->unknownMessage = false;
745 serverMessage();
746 break;
747
748 default:
749 {
750 EString err = "Unexpected message (";
751
752 if ( type > 32 && type < 127 )
753 err.append( type );
754 else
755 err.append( "%" + fn( (int)type, 16 ) );
756
757 err.append( ") received" );
758 if ( d->startup ) {
759 if ( !d->authenticated )
760 err.append( " during authentication" );
761 else
762 err.append( " during backend startup" );
763 }
764 err.append( "." );
765 error( err );
766 }
767 break;
768 }
769 }
770
771
772 /*! This function handles errors and other messages from the server.
773
774 Uses the sqlstates specified
775 http://www.postgresql.org/docs/current/static/protocol.html
776 extensively.
777 */
778
serverMessage()779 void Postgres::serverMessage()
780 {
781 Scope x;
782 EString s;
783 PgMessage msg( readBuffer() );
784 Query *q = d->queries.firstElement();
785 EString m( msg.message() );
786 EString code = msg.code();
787 Endpoint server( peer() );
788
789 if ( code == "57P03" ) {
790 log( "Retrying connection after delay because PostgreSQL "
791 "is still starting up.", Log::Info );
792 close();
793 sleep( 1 );
794 connect( server );
795 }
796 else if ( code == "57P01" || code == "57P02" ) {
797 if ( code == "57P01" )
798 log( "PostgreSQL is shutting down; closing connection.", Log::Info );
799 else
800 log( "PostgreSQL reports a crash; closing connection.", Log::Info );
801 removeHandle( this );
802 if ( ::listener == this )
803 ::listener = 0;
804 close();
805 error( "PostgreSQL server shut down" );
806 }
807 else if ( code == "28000" && m.lower().containsWord( "ident" ) ) {
808 int b = m.find( '"' );
809 int e = m.find( '"', b+1 );
810 EString user( m.mid( b+1, e-b-1 ) );
811
812 struct passwd * u = getpwnam( d->user.cstr() );
813
814 struct passwd * p = 0;
815 const char * pg
816 = Configuration::compiledIn( Configuration::PgUser );
817
818 if ( pg )
819 p = getpwnam( pg );
820 if ( !p )
821 p = getpwnam( "postgres" );
822 if ( !p )
823 p = getpwnam( "pgsql" );
824
825 if ( !d->identBreakageSeen && loginAs() == DbOwner &&
826 u == 0 && p != 0 )
827 {
828 d->identBreakageSeen = true;
829 d->setSessionAuthorisation = true;
830 log( "Attempting to authenticate as superuser to use "
831 "SET SESSION AUTHORIZATION", Log::Info );
832 d->user = EString( p->pw_name );
833 uid_t e = geteuid();
834 setreuid( 0, p->pw_uid );
835 close();
836 connect( server );
837 setreuid( 0, e );
838 }
839 else if ( s == Configuration::text(Configuration::JailUser) &&
840 Configuration::toggle( Configuration::Security ) &&
841 self().protocol() != Endpoint::Unix )
842 {
843 // If we connected via IPv4 or IPv6, early enough that
844 // postgres had a chance to reject us, we'll try again.
845 d->identBreakageSeen = true;
846 log( "PostgreSQL demanded IDENT, which did not match "
847 "during startup. Retrying.", Log::Info );
848 close();
849 connect( server );
850 }
851 else {
852 log( "PostgreSQL refuses authentication because this "
853 "process is not running as user " + user.quoted() +
854 ". See http://aox.org/faq/mailstore#ident",
855 Log::Disaster );
856 }
857 }
858 else if ( code == "28000" ) {
859 log( "Cannot authenticate as PostgreSQL user " + d->user.quoted() +
860 ". Server message: " + msg.message(), Log::Disaster );
861 }
862 else if ( code.startsWith( "53" ) ) {
863 uint m = Configuration::scalar( Configuration::DbMaxHandles );
864 if ( code == "53000" )
865 log( "PostgreSQL server reports too many client connections. "
866 "Our connection count is " + fn( numHandles() ) + ", "
867 "configured maximum is " + fn( m ) + ".",
868 Log::Error );
869 else
870 log( "PostgreSQL server has a resource problem (" + code + "): " +
871 msg.message(),
872 Log::Significant );
873 if ( m > 2 ) {
874 log( "Setting db-max-handles to 2 (was " + fn( m ) + ")" );
875 Configuration::add( "db-max-handles = 2" );
876 }
877
878 }
879 else if ( msg.type() == PgMessage::Notification ) {
880 s.append( "PostgreSQL server: " );
881 if ( q ) {
882 s.append( "Query " + q->description() + ": " );
883 x.setLog( q->log() );
884 }
885 s.append( m );
886 if ( !code.startsWith( "00" ) )
887 s.append( " (warning)" );
888 ::log( s, Log::Debug );
889 }
890 else if ( q && !code.startsWith( "00" ) ) {
891 s.append( "PostgreSQL server: " );
892 s.append( "Query " + q->description() + " failed: " );
893 x.setLog( q->log() );
894 s.append( m );
895 if ( !msg.detail().isEmpty() )
896 s.append( " (" + msg.detail() + ")" );
897 s.append( " (" + code + ")" );
898
899 // If we sent a Parse message for a named prepared statement
900 // while processing this query, but don't already know that
901 // it succeeded, we'll assume that statement name does not
902 // exist for future use.
903 EString * pp = d->preparesPending.first();
904 if ( q->name() != "" && pp && *pp == q->name() ) {
905 d->prepared.remove( q->name() );
906 d->preparesPending.shift();
907 }
908 if ( q->inputLines() )
909 d->sendingCopy = false;
910 d->queries.shift();
911 m = mapped( m );
912 if ( !msg.detail().isEmpty() )
913 s.append( " (" + msg.detail() + ")" );
914 q->setError( m );
915 q->notify();
916 }
917 else {
918 ::log( "PostgreSQL server message could not be interpreted."
919 " Message: " + msg.message() +
920 " SQL state code: " + code +
921 " Severity: " + msg.severity().lower(),
922 Log::Error );
923 }
924
925 if ( code.startsWith( "08" ) ) // connection exception
926 error( "PostgreSQL server error: " + s );
927 }
928
929
930 // these errors are based on a selection of the results from
931 // select indexname from pg_indexes where tablename in
932 // (select tablename from pg_tables where tableowner='aoxsuper')
933
934 static const struct {
935 const char * constraint;
936 const char * human;
937 } errormap[] = {
938 // some index names
939 {"addresses_nld_key",
940 "Operation would create two identical addresses" },
941 {"u_l",
942 "Operation wold create two users with identical login names"},
943 // some constraints from our postgresql schema
944 {"aliases_address_fkey", // contype f
945 "Operation would create two aliases with the same address"},
946 {"aliases_address_key", // contype u
947 "Operation would create two aliases with the same address"},
948 {"annotation_names_name_key", // contype u
949 "Operation would create two annotation_names rows with the same_name"},
950 {"annotations_mailbox_key", // contype u
951 "Operation would create a duplicate annotations row"},
952 {"annotations_mailbox_key1", // contype u
953 "Operation would create a duplicate annotations row"},
954 // XXX where does the annotations unique condition end up?
955 {"deliveries_message_key", // contype u
956 "Operation would store the same message for delivery twice"},
957 {"field_names_name_key", // contype u
958 "Operation would create two header field names with the same name"},
959 {"fn_uname",
960 "Operation would store two identical flag names separately"},
961 {"group_members_groupname_fkey", // contype f
962 "Operation would create group_members row with invalid groupname"},
963 {"group_members_member_fkey", // contype f
964 "Operation would create group_members row with invalid member"},
965 {"group_members_pkey", // contype p
966 "Operation would create duplicate group_members row"},
967 // XXX shouldb't groups.name be unique? and different from all users.name?
968 {"mailboxes_name_key", // contype u
969 "Operation would create two mailboxes with the same name"},
970 {"mailboxes_owner_fkey", // contype f
971 "Operation would create a mailbox without an owner"},
972 {"messages_id_key", // contype u
973 "Opeation would create two messages objects with the same ID"},
974 {"namespaces_name_key", // contype u
975 "Operation would create two user namespaces with the same name"},
976 {"permissions_mailbox_fkey", // contype f
977 "Operation would create a permissions row without a mailbox"},
978 {"permissions_pkey", // contype p
979 "Operation would create a duplicate permissions row"},
980 {"scripts_owner_key", // contype u
981 "Operation would store two scripts with the same owner and name"},
982 // XXX shouldn't users.alias be unique?
983 {"users_alias_fkey", // contype f
984 "users_alias"},
985 {"users_parentspace_fkey", // contype f
986 "Operation would create a users row without a namespace"},
987 {0,0}
988 };
989
990
991
992 /*! Looks for constraint names in \a s and returns an error message
993 corresponding to the relevant constraint. Returns \a s if it finds
994 none.
995 */
996
mapped(const EString & s) const997 EString Postgres::mapped( const EString & s ) const
998 {
999 if ( !s.contains( "_" ) )
1000 return "PostgreSQL Server: " + s;
1001
1002 EString h;
1003 uint maps = 0;
1004 EString w;
1005 uint i = 0;
1006 while ( maps < 2 && i <= s.length() ) {
1007 char c = s[i];
1008 if ( ( c >= 'a' && c <= 'z' ) ||
1009 ( c >= 'A' && c <= 'Z' ) ||
1010 ( c >= '0' && c <= '9' ) ||
1011 ( c == '_' ) ) {
1012 w.append( c );
1013 }
1014 else if ( !w.isEmpty() ) {
1015 uint j = 0;
1016 while ( errormap[j].constraint && w != errormap[j].constraint )
1017 j++;
1018 if ( errormap[j].constraint ) {
1019 maps++;
1020 h = errormap[j].human;
1021 h.append( " (" );
1022 h.append( w );
1023 h.append( ")" );
1024 }
1025 w.truncate();
1026 }
1027 i++;
1028 }
1029 if ( maps != 1 )
1030 return "PostgreSQL Server: " + s;
1031
1032 return h;
1033 }
1034
1035
1036
1037 /*! Handles all protocol/socket errors by logging the error message \a s
1038 and closing the connection after emptying the write buffer and
1039 notifying any pending queries of the failure.
1040
1041 */
1042
error(const EString & s)1043 void Postgres::error( const EString &s )
1044 {
1045 if ( ::listener == this )
1046 ::listener = 0;
1047
1048 Scope x( log() );
1049 ::log( s + " (on backend " + fn( connectionNumber() ) + ")", Log::Error );
1050
1051 d->error = true;
1052 d->active = false;
1053 setState( Broken );
1054
1055 List< Query >::Iterator q( d->queries );
1056 while ( q ) {
1057 q->setError( s );
1058 q->notify();
1059 ++q;
1060 }
1061
1062 removeHandle( this );
1063
1064 writeBuffer()->remove( writeBuffer()->size() );
1065 Connection::setState( Closing );
1066 }
1067
1068
1069 /*! Sends a termination message and takes this database handle out of
1070 circulation gracefully.
1071 */
1072
shutdown()1073 void Postgres::shutdown()
1074 {
1075 if ( ::listener == this )
1076 ::listener = 0;
1077
1078 PgTerminate msg;
1079 msg.enqueue( writeBuffer() );
1080
1081 if ( d->transaction ) {
1082 d->transaction->setError( 0, "Database connection shutdown" );
1083 d->transaction->notify();
1084 }
1085 List< Query >::Iterator q( d->queries );
1086 while ( q ) {
1087 if ( !q->done() ) {
1088 q->setError( "Database connection shutdown" );
1089 q->notify();
1090 }
1091 ++q;
1092 }
1093
1094 removeHandle( this );
1095 d->active = false;
1096 }
1097
1098
hasMessage(Buffer * b)1099 static bool hasMessage( Buffer *b )
1100 {
1101 if ( b->size() < 5 ||
1102 b->size() < 1+( (uint)((*b)[1]<<24)|((*b)[2]<<16)|
1103 ((*b)[3]<<8)|((*b)[4]) ) )
1104 return false;
1105 return true;
1106 }
1107
1108
1109 /*! Returns true if this handle is willing to process new queries: i.e.
1110 if it has an active and error-free connection to the server, and no
1111 outstanding queries; and false otherwise.
1112 */
1113
usable() const1114 bool Postgres::usable() const
1115 {
1116 return ( d->active && !d->startup &&
1117 !( state() == Connecting || state() == Broken ) &&
1118 d->queries.isEmpty() );
1119 }
1120
1121
1122 static GraphableCounter * goodQueries = 0;
1123 static GraphableCounter * badQueries = 0;
1124
1125
1126 /*! Updates the statistics when \a q is done. */
1127
countQueries(class Query * q)1128 void Postgres::countQueries( class Query * q )
1129 {
1130 if ( !goodQueries ) {
1131 goodQueries = new GraphableCounter( "queries-executed" ); // bad name?
1132 badQueries = new GraphableCounter( "queries-failed" ); // bad name?
1133 }
1134
1135 if ( !q->failed() )
1136 goodQueries->tick();
1137 else if ( !q->canFail() )
1138 badQueries->tick();
1139 ; // a query which fails but canFail is not counted anywhere.
1140
1141 // later also use GraphableDataSet to keep track of query
1142 // execution times, but not right now.
1143 }
1144
1145
1146 /*! Returns the Postgres server's declared version number as an
1147 integer. 8.1.0 is returned as 80100, 8.3.2 as 80302.
1148
1149 The version number is learned immediately after connecting.
1150 version() returns 0 until the first Postgres instance learns the
1151 server version.
1152 */
1153
version()1154 uint Postgres::version()
1155 {
1156 return ::serverVersion;
1157 }
1158
1159
1160 /*! Makes sure Postgres sends as many LISTEN commands as necessary,
1161 see DatabaseSignal and
1162 http://www.postgresql.org/docs/8.1/static/sql-listen.html
1163
1164 */
1165
sendListen()1166 void Postgres::sendListen()
1167 {
1168 EStringList::Iterator s( DatabaseSignal::names() );
1169 while ( s ) {
1170 EString name = *s;
1171 ++s;
1172 if ( !d->listening.contains( name ) ) {
1173 d->listening.append( name );
1174 if ( !name.boring() )
1175 name = name.quoted();
1176 processQuery( new Query( "listen " + name, 0 ) );
1177 }
1178 }
1179 }
1180
1181
1182 /*! Returns the query string for \a q, after possibly applying
1183 version-specific hacks and workarounds. */
1184
queryString(Query * q)1185 EString Postgres::queryString( Query * q )
1186 {
1187 if ( !q->name().isEmpty() )
1188 return q->string();
1189
1190 EString s( q->string() );
1191
1192 if ( s != q->string() ) {
1193 Scope x( q->log() );
1194 log( "Changing query string to: " + s, Log::Debug );
1195 }
1196
1197 return s;
1198 }
1199
1200
1201 class PgCanceller
1202 : public Postgres
1203 {
1204 private:
1205 PgKeyData * k;
1206
1207 public:
PgCanceller(PgKeyData * key)1208 PgCanceller( PgKeyData * key )
1209 : Postgres(), k( key )
1210 {
1211 log( "Sending cancel for pid " + fn( k->pid() ), Log::Debug );
1212 }
1213
react(Event e)1214 void react( Event e )
1215 {
1216 switch (e) {
1217 case Connect:
1218 {
1219 PgCancel msg( k );
1220 msg.enqueue( writeBuffer() );
1221 Connection::setState( Closing );
1222 }
1223 break;
1224
1225 default:
1226 break;
1227 }
1228 }
1229 };
1230
1231
1232 /*! Issues a cancel request for the query \a q if it is being executed
1233 by this Postgres object. If not, it does nothing.
1234 */
1235
cancel(Query * q)1236 void Postgres::cancel( Query * q )
1237 {
1238 if ( d->queries.find( q ) )
1239 (void)new PgCanceller( d->keydata );
1240 }
1241