1 /*++
2 /* NAME
3 /*	qmgr_deliver 3
4 /* SUMMARY
5 /*	deliver one per-site queue entry to that site
6 /* SYNOPSIS
7 /*	#include "qmgr.h"
8 /*
9 /*	int	qmgr_deliver_concurrency;
10 /*
11 /*	int	qmgr_deliver(transport, fp)
12 /*	QMGR_TRANSPORT *transport;
13 /*	VSTREAM	*fp;
14 /* DESCRIPTION
15 /*	This module implements the client side of the `queue manager
16 /*	to delivery agent' protocol. The queue manager uses
17 /*	asynchronous I/O so that it can drive multiple delivery
18 /*	agents in parallel. Depending on the outcome of a delivery
19 /*	attempt, the status of messages, queues and transports is
20 /*	updated.
21 /*
22 /*	qmgr_deliver_concurrency is a global counter that says how
23 /*	many delivery processes are in use. This can be used, for
24 /*	example, to control the size of the `active' message queue.
25 /*
26 /*	qmgr_deliver() executes when a delivery process announces its
27 /*	availability for the named transport. It arranges for delivery
28 /*	of a suitable queue entry.  The \fIfp\fR argument specifies a
29 /*	stream that is connected to the delivery process, or a null
30 /*	pointer if the transport accepts no connection. Upon completion
31 /*	of delivery (successful or not), the stream is closed, so that the
32 /*	delivery process is released.
33 /* DIAGNOSTICS
34 /* LICENSE
35 /* .ad
36 /* .fi
37 /*	The Secure Mailer license must be distributed with this software.
38 /* AUTHOR(S)
39 /*	Wietse Venema
40 /*	IBM T.J. Watson Research
41 /*	P.O. Box 704
42 /*	Yorktown Heights, NY 10598, USA
43 /*
44 /*	Wietse Venema
45 /*	Google, Inc.
46 /*	111 8th Avenue
47 /*	New York, NY 10011, USA
48 /*--*/
49 
50 /* System library. */
51 
52 #include <sys_defs.h>
53 #include <time.h>
54 #include <string.h>
55 
56 /* Utility library. */
57 
58 #include <msg.h>
59 #include <vstring.h>
60 #include <vstream.h>
61 #include <vstring_vstream.h>
62 #include <events.h>
63 #include <iostuff.h>
64 #include <stringops.h>
65 #include <mymalloc.h>
66 
67 /* Global library. */
68 
69 #include <mail_queue.h>
70 #include <mail_proto.h>
71 #include <recipient_list.h>
72 #include <mail_params.h>
73 #include <deliver_request.h>
74 #include <verp_sender.h>
75 #include <dsn_util.h>
76 #include <dsn_buf.h>
77 #include <dsb_scan.h>
78 #include <rcpt_print.h>
79 #include <smtputf8.h>
80 
81 /* Application-specific. */
82 
83 #include "qmgr.h"
84 
85  /*
86   * Important note on the _transport_rate_delay implementation: after
87   * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
88   * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
89   * or qmgr_transport_throttle(). Otherwise, transports with non-zero
90   * _transport_rate_delay will become stuck.
91   */
92 
93 int     qmgr_deliver_concurrency;
94 
95  /*
96   * Message delivery status codes.
97   */
98 #define DELIVER_STAT_OK		0	/* all recipients delivered */
99 #define DELIVER_STAT_DEFER	1	/* try some recipients later */
100 #define DELIVER_STAT_CRASH	2	/* mailer internal problem */
101 
102 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
103 
qmgr_deliver_initial_reply(VSTREAM * stream)104 static int qmgr_deliver_initial_reply(VSTREAM *stream)
105 {
106     if (peekfd(vstream_fileno(stream)) < 0) {
107 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
108 	return (DELIVER_STAT_CRASH);
109     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
110 		  RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
111 			 ATTR_TYPE_END) != 0) {
112 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
113 	return (DELIVER_STAT_DEFER);
114     } else {
115 	return (0);
116     }
117 }
118 
119 /* qmgr_deliver_final_reply - retrieve final delivery process response */
120 
qmgr_deliver_final_reply(VSTREAM * stream,DSN_BUF * dsb)121 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
122 {
123     int     stat;
124 
125     if (peekfd(vstream_fileno(stream)) < 0) {
126 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
127 	return (DELIVER_STAT_CRASH);
128     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
129 			 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
130 			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
131 			 ATTR_TYPE_END) != 2) {
132 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
133 	return (DELIVER_STAT_CRASH);
134     } else {
135 	return (stat ? DELIVER_STAT_DEFER : 0);
136     }
137 }
138 
139 /* qmgr_deliver_send_request - send delivery request to delivery process */
140 
qmgr_deliver_send_request(QMGR_ENTRY * entry,VSTREAM * stream)141 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
142 {
143     RECIPIENT_LIST list = entry->rcpt_list;
144     RECIPIENT *recipient;
145     QMGR_MESSAGE *message = entry->message;
146     VSTRING *sender_buf = 0;
147     MSG_STATS stats;
148     char   *sender;
149     int     flags;
150     int     smtputf8 = message->smtputf8;
151     const char *addr;
152 
153     /*
154      * Todo: integrate with code up-stream that builds the delivery request.
155      */
156     for (recipient = list.info; recipient < list.info + list.len; recipient++)
157 	if (var_smtputf8_enable && (addr = recipient->address)[0]
158 	    && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
159 	    smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
160 	    if (message->verp_delims)
161 		smtputf8 |= SMTPUTF8_FLAG_SENDER;
162 	}
163 
164     /*
165      * If variable envelope return path is requested, change prefix+@origin
166      * into prefix+user=domain@origin. Note that with VERP there is only one
167      * recipient per delivery.
168      */
169     if (message->verp_delims == 0) {
170 	sender = message->sender;
171     } else {
172 	sender_buf = vstring_alloc(100);
173 	verp_sender(sender_buf, message->verp_delims,
174 		    message->sender, list.info);
175 	sender = vstring_str(sender_buf);
176     }
177 
178     flags = message->tflags
179 	| entry->queue->dflags
180 	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
181     (void) QMGR_MSG_STATS(&stats, message);
182     attr_print(stream, ATTR_FLAG_NONE,
183 	       SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
184 	       SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
185 	       SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
186 	       SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
187 	       SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
188 	       SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
189 	       SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
190 	       SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
191 	       SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
192 	       SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
193 	       SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
194 	       SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
195     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
196 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
197 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
198 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
199 	     SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
200 	       SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
201     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
202 	       SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
203 	     SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
204 	       SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
205     /* XXX Ditto if we want to pass TLS certificate info. */
206 	       SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
207 	     SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
208 	       SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
209 	       ATTR_TYPE_END);
210     if (sender_buf != 0)
211 	vstring_free(sender_buf);
212     for (recipient = list.info; recipient < list.info + list.len; recipient++)
213 	attr_print(stream, ATTR_FLAG_NONE,
214 		   SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
215 		   ATTR_TYPE_END);
216     if (vstream_fflush(stream) != 0) {
217 	msg_warn("write to process (%s): %m", entry->queue->transport->name);
218 	return (-1);
219     } else {
220 	if (msg_verbose)
221 	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
222 	return (0);
223     }
224 }
225 
226 /* qmgr_deliver_abort - transport response watchdog */
227 
qmgr_deliver_abort(int unused_event,void * context)228 static void qmgr_deliver_abort(int unused_event, void *context)
229 {
230     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
231     QMGR_QUEUE *queue = entry->queue;
232     QMGR_TRANSPORT *transport = queue->transport;
233     QMGR_MESSAGE *message = entry->message;
234 
235     msg_fatal("%s: timeout receiving delivery status from transport: %s",
236 	      message->queue_id, transport->name);
237 }
238 
239 /* qmgr_deliver_update - process delivery status report */
240 
qmgr_deliver_update(int unused_event,void * context)241 static void qmgr_deliver_update(int unused_event, void *context)
242 {
243     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
244     QMGR_QUEUE *queue = entry->queue;
245     QMGR_TRANSPORT *transport = queue->transport;
246     QMGR_MESSAGE *message = entry->message;
247     static DSN_BUF *dsb;
248     int     status;
249 
250     /*
251      * Release the delivery agent from a "hot" queue entry.
252      */
253 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
254 	event_disable_readwrite(vstream_fileno(entry->stream)); \
255 	(void) vstream_fclose(entry->stream); \
256 	entry->stream = 0; \
257 	qmgr_deliver_concurrency--; \
258     } while (0)
259 
260     if (dsb == 0)
261 	dsb = dsb_create();
262 
263     /*
264      * The message transport has responded. Stop the watchdog timer.
265      */
266     event_cancel_timer(qmgr_deliver_abort, context);
267 
268     /*
269      * Retrieve the delivery agent status report. The numerical status code
270      * indicates if delivery should be tried again. The reason text is sent
271      * only when a site should be avoided for a while, so that the queue
272      * manager can log why it does not even try to schedule delivery to the
273      * affected recipients.
274      */
275     status = qmgr_deliver_final_reply(entry->stream, dsb);
276 
277     /*
278      * The mail delivery process failed for some reason (although delivery
279      * may have been successful). Back off with this transport type for a
280      * while. Dispose of queue entries for this transport that await
281      * selection (the todo lists). Stay away from queue entries that have
282      * been selected (the busy lists), or we would have dangling pointers.
283      * The queue itself won't go away before we dispose of the current queue
284      * entry.
285      */
286     if (status == DELIVER_STAT_CRASH) {
287 	message->flags |= DELIVER_STAT_DEFER;
288 #if 0
289 	whatsup = concatenate("unknown ", transport->name,
290 			      " mail transport error", (char *) 0);
291 	qmgr_transport_throttle(transport,
292 				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
293 	myfree(whatsup);
294 #else
295 	qmgr_transport_throttle(transport,
296 				DSN_SIMPLE(&dsb->dsn, "4.3.0",
297 					   "unknown mail transport error"));
298 #endif
299 	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
300 		 transport->name);
301 
302 	/*
303 	 * Assume the worst and write a defer logfile record for each
304 	 * recipient. This omission was already present in the first queue
305 	 * manager implementation of 199703, and was fixed 200511.
306 	 *
307 	 * To avoid the synchronous qmgr_defer_recipient() operation for each
308 	 * recipient of this queue entry, release the delivery process and
309 	 * move the entry back to the todo queue. Let qmgr_defer_transport()
310 	 * log the recipient asynchronously if possible, and get out of here.
311 	 * Note: if asynchronous logging is not possible,
312 	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
313 	 * the entry becomes a dangling pointer.
314 	 */
315 	QMGR_DELIVER_RELEASE_AGENT(entry);
316 	qmgr_entry_unselect(queue, entry);
317 	qmgr_defer_transport(transport, &dsb->dsn);
318 	return;
319     }
320 
321     /*
322      * This message must be tried again.
323      *
324      * If we have a problem talking to this site, back off with this site for a
325      * while; dispose of queue entries for this site that await selection
326      * (the todo list); stay away from queue entries that have been selected
327      * (the busy list), or we would have dangling pointers. The queue itself
328      * won't go away before we dispose of the current queue entry.
329      *
330      * XXX Caution: DSN_COPY() will panic on empty status or reason.
331      */
332 #define SUSPENDED	"delivery temporarily suspended: "
333 
334     if (status == DELIVER_STAT_DEFER) {
335 	message->flags |= DELIVER_STAT_DEFER;
336 	if (VSTRING_LEN(dsb->status)) {
337 	    /* Sanitize the DSN status/reason from the delivery agent. */
338 	    if (!dsn_valid(vstring_str(dsb->status)))
339 		vstring_strcpy(dsb->status, "4.0.0");
340 	    if (VSTRING_LEN(dsb->reason) == 0)
341 		vstring_strcpy(dsb->reason, "unknown error");
342 	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
343 	    if (QMGR_QUEUE_READY(queue)) {
344 		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
345 		if (QMGR_QUEUE_THROTTLED(queue))
346 		    qmgr_defer_todo(queue, &dsb->dsn);
347 	    }
348 	}
349     }
350 
351     /*
352      * No problems detected. Mark the transport and queue as alive. The queue
353      * itself won't go away before we dispose of the current queue entry.
354      */
355     if (status != DELIVER_STAT_CRASH) {
356 	qmgr_transport_unthrottle(transport);
357 	if (VSTRING_LEN(dsb->reason) == 0)
358 	    qmgr_queue_unthrottle(queue);
359     }
360 
361     /*
362      * Release the delivery process, and give some other queue entry a chance
363      * to be delivered. When all recipients for a message have been tried,
364      * decide what to do next with this message: defer, bounce, delete.
365      */
366     QMGR_DELIVER_RELEASE_AGENT(entry);
367     qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
368 }
369 
370 /* qmgr_deliver - deliver one per-site queue entry */
371 
qmgr_deliver(QMGR_TRANSPORT * transport,VSTREAM * stream)372 void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
373 {
374     QMGR_QUEUE *queue;
375     QMGR_ENTRY *entry;
376     DSN     dsn;
377 
378     /*
379      * Find out if this delivery process is really available. Once elected,
380      * the delivery process is supposed to express its happiness. If there is
381      * a problem, wipe the pending deliveries for this transport. This
382      * routine runs in response to an external event, so it does not run
383      * while some other queue manipulation is happening.
384      */
385     if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
386 #if 0
387 	whatsup = concatenate(transport->name,
388 			      " mail transport unavailable", (char *) 0);
389 	qmgr_transport_throttle(transport,
390 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
391 	myfree(whatsup);
392 #else
393 	qmgr_transport_throttle(transport,
394 				DSN_SIMPLE(&dsn, "4.3.0",
395 					   "mail transport unavailable"));
396 #endif
397 	qmgr_defer_transport(transport, &dsn);
398 	if (stream)
399 	    (void) vstream_fclose(stream);
400 	return;
401     }
402 
403     /*
404      * Find a suitable queue entry. Things may have changed since this
405      * transport was allocated. If no suitable entry is found,
406      * unceremoniously disconnect from the delivery process. The delivery
407      * agent request reading routine is prepared for the queue manager to
408      * change its mind for no apparent reason.
409      */
410     if ((queue = qmgr_queue_select(transport)) == 0
411 	|| (entry = qmgr_entry_select(queue)) == 0) {
412 	(void) vstream_fclose(stream);
413 	return;
414     }
415 
416     /*
417      * Send the queue file info and recipient info to the delivery process.
418      * If there is a problem, wipe the pending deliveries for this transport.
419      * This routine runs in response to an external event, so it does not run
420      * while some other queue manipulation is happening.
421      */
422     if (qmgr_deliver_send_request(entry, stream) < 0) {
423 	qmgr_entry_unselect(queue, entry);
424 #if 0
425 	whatsup = concatenate(transport->name,
426 			      " mail transport unavailable", (char *) 0);
427 	qmgr_transport_throttle(transport,
428 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
429 	myfree(whatsup);
430 #else
431 	qmgr_transport_throttle(transport,
432 				DSN_SIMPLE(&dsn, "4.3.0",
433 					   "mail transport unavailable"));
434 #endif
435 	qmgr_defer_transport(transport, &dsn);
436 	/* warning: entry and queue may be dangling pointers here */
437 	(void) vstream_fclose(stream);
438 	return;
439     }
440 
441     /*
442      * If we get this far, go wait for the delivery status report.
443      */
444     qmgr_deliver_concurrency++;
445     entry->stream = stream;
446     event_enable_read(vstream_fileno(stream),
447 		      qmgr_deliver_update, (void *) entry);
448 
449     /*
450      * Guard against broken systems.
451      */
452     event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
453 }
454