1 /*++
2 /* NAME
3 /*	qmgr_deliver 3
4 /* SUMMARY
5 /*	deliver one per-site queue entry to that site
6 /* SYNOPSIS
7 /*	#include "qmgr.h"
8 /*
9 /*	int	qmgr_deliver_concurrency;
10 /*
11 /*	int	qmgr_deliver(transport, fp)
12 /*	QMGR_TRANSPORT *transport;
13 /*	VSTREAM	*fp;
14 /* DESCRIPTION
15 /*	This module implements the client side of the `queue manager
16 /*	to delivery agent' protocol. The queue manager uses
17 /*	asynchronous I/O so that it can drive multiple delivery
18 /*	agents in parallel. Depending on the outcome of a delivery
19 /*	attempt, the status of messages, queues and transports is
20 /*	updated.
21 /*
22 /*	qmgr_deliver_concurrency is a global counter that says how
23 /*	many delivery processes are in use. This can be used, for
24 /*	example, to control the size of the `active' message queue.
25 /*
26 /*	qmgr_deliver() executes when a delivery process announces its
27 /*	availability for the named transport. It arranges for delivery
28 /*	of a suitable queue entry.  The \fIfp\fR argument specifies a
29 /*	stream that is connected to a delivery process, or a null
30 /*	pointer if the transport accepts no connection. Upon completion
31 /*	of delivery (successful or not), the stream is closed, so that the
32 /*	delivery process is released.
33 /* DIAGNOSTICS
34 /* LICENSE
35 /* .ad
36 /* .fi
37 /*	The Secure Mailer license must be distributed with this software.
38 /* AUTHOR(S)
39 /*	Wietse Venema
40 /*	IBM T.J. Watson Research
41 /*	P.O. Box 704
42 /*	Yorktown Heights, NY 10598, USA
43 /*
44 /*	Preemptive scheduler enhancements:
45 /*	Patrik Rak
46 /*	Modra 6
47 /*	155 00, Prague, Czech Republic
48 /*
49 /*	Wietse Venema
50 /*	Google, Inc.
51 /*	111 8th Avenue
52 /*	New York, NY 10011, USA
53 /*--*/
54 
55 /* System library. */
56 
57 #include <sys_defs.h>
58 #include <time.h>
59 #include <string.h>
60 
61 /* Utility library. */
62 
63 #include <msg.h>
64 #include <vstring.h>
65 #include <vstream.h>
66 #include <vstring_vstream.h>
67 #include <events.h>
68 #include <iostuff.h>
69 #include <stringops.h>
70 #include <mymalloc.h>
71 
72 /* Global library. */
73 
74 #include <mail_queue.h>
75 #include <mail_proto.h>
76 #include <recipient_list.h>
77 #include <mail_params.h>
78 #include <deliver_request.h>
79 #include <verp_sender.h>
80 #include <dsn_util.h>
81 #include <dsn_buf.h>
82 #include <dsb_scan.h>
83 #include <rcpt_print.h>
84 #include <smtputf8.h>
85 
86 /* Application-specific. */
87 
88 #include "qmgr.h"
89 
90  /*
91   * Important note on the _transport_rate_delay implementation: after
92   * qmgr_transport_alloc() sets the QMGR_TRANSPORT_STAT_RATE_LOCK flag, all
93   * code paths must directly or indirectly invoke qmgr_transport_unthrottle()
94   * or qmgr_transport_throttle(). Otherwise, transports with non-zero
95   * _transport_rate_delay will become stuck.
96   */
97 
98 int     qmgr_deliver_concurrency;
99 
100  /*
101   * Message delivery status codes.
102   */
103 #define DELIVER_STAT_OK		0	/* all recipients delivered */
104 #define DELIVER_STAT_DEFER	1	/* try some recipients later */
105 #define DELIVER_STAT_CRASH	2	/* mailer internal problem */
106 
107 /* qmgr_deliver_initial_reply - retrieve initial delivery process response */
108 
qmgr_deliver_initial_reply(VSTREAM * stream)109 static int qmgr_deliver_initial_reply(VSTREAM *stream)
110 {
111     if (peekfd(vstream_fileno(stream)) < 0) {
112 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
113 	return (DELIVER_STAT_CRASH);
114     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
115 		  RECV_ATTR_STREQ(MAIL_ATTR_PROTO, MAIL_ATTR_PROTO_DELIVER),
116 			 ATTR_TYPE_END) != 0) {
117 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
118 	return (DELIVER_STAT_DEFER);
119     } else {
120 	return (0);
121     }
122 }
123 
124 /* qmgr_deliver_final_reply - retrieve final delivery process response */
125 
qmgr_deliver_final_reply(VSTREAM * stream,DSN_BUF * dsb)126 static int qmgr_deliver_final_reply(VSTREAM *stream, DSN_BUF *dsb)
127 {
128     int     stat;
129 
130     if (peekfd(vstream_fileno(stream)) < 0) {
131 	msg_warn("%s: premature disconnect", VSTREAM_PATH(stream));
132 	return (DELIVER_STAT_CRASH);
133     } else if (attr_scan(stream, ATTR_FLAG_STRICT,
134 			 RECV_ATTR_FUNC(dsb_scan, (void *) dsb),
135 			 RECV_ATTR_INT(MAIL_ATTR_STATUS, &stat),
136 			 ATTR_TYPE_END) != 2) {
137 	msg_warn("%s: malformed response", VSTREAM_PATH(stream));
138 	return (DELIVER_STAT_CRASH);
139     } else {
140 	return (stat ? DELIVER_STAT_DEFER : 0);
141     }
142 }
143 
144 /* qmgr_deliver_send_request - send delivery request to delivery process */
145 
qmgr_deliver_send_request(QMGR_ENTRY * entry,VSTREAM * stream)146 static int qmgr_deliver_send_request(QMGR_ENTRY *entry, VSTREAM *stream)
147 {
148     RECIPIENT_LIST list = entry->rcpt_list;
149     RECIPIENT *recipient;
150     QMGR_MESSAGE *message = entry->message;
151     VSTRING *sender_buf = 0;
152     MSG_STATS stats;
153     char   *sender;
154     int     flags;
155     int     smtputf8 = message->smtputf8;
156     const char *addr;
157 
158     /*
159      * Todo: integrate with code up-stream that builds the delivery request.
160      */
161     for (recipient = list.info; recipient < list.info + list.len; recipient++)
162 	if (var_smtputf8_enable && (addr = recipient->address)[0]
163 	    && !allascii(addr) && valid_utf8_string(addr, strlen(addr))) {
164 	    smtputf8 |= SMTPUTF8_FLAG_RECIPIENT;
165 	    if (message->verp_delims)
166 		smtputf8 |= SMTPUTF8_FLAG_SENDER;
167 	}
168 
169     /*
170      * If variable envelope return path is requested, change prefix+@origin
171      * into prefix+user=domain@origin. Note that with VERP there is only one
172      * recipient per delivery.
173      */
174     if (message->verp_delims == 0) {
175 	sender = message->sender;
176     } else {
177 	sender_buf = vstring_alloc(100);
178 	verp_sender(sender_buf, message->verp_delims,
179 		    message->sender, list.info);
180 	sender = vstring_str(sender_buf);
181     }
182 
183     flags = message->tflags
184 	| entry->queue->dflags
185 	| (message->inspect_xport ? DEL_REQ_FLAG_BOUNCE : DEL_REQ_FLAG_DEFLT);
186     (void) QMGR_MSG_STATS(&stats, message);
187     attr_print(stream, ATTR_FLAG_NONE,
188 	       SEND_ATTR_INT(MAIL_ATTR_FLAGS, flags),
189 	       SEND_ATTR_STR(MAIL_ATTR_QUEUE, message->queue_name),
190 	       SEND_ATTR_STR(MAIL_ATTR_QUEUEID, message->queue_id),
191 	       SEND_ATTR_LONG(MAIL_ATTR_OFFSET, message->data_offset),
192 	       SEND_ATTR_LONG(MAIL_ATTR_SIZE, message->cont_length),
193 	       SEND_ATTR_STR(MAIL_ATTR_NEXTHOP, entry->queue->nexthop),
194 	       SEND_ATTR_STR(MAIL_ATTR_ENCODING, message->encoding),
195 	       SEND_ATTR_INT(MAIL_ATTR_SMTPUTF8, smtputf8),
196 	       SEND_ATTR_STR(MAIL_ATTR_SENDER, sender),
197 	       SEND_ATTR_STR(MAIL_ATTR_DSN_ENVID, message->dsn_envid),
198 	       SEND_ATTR_INT(MAIL_ATTR_DSN_RET, message->dsn_ret),
199 	       SEND_ATTR_FUNC(msg_stats_print, (const void *) &stats),
200     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
201 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_NAME, message->client_name),
202 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_ADDR, message->client_addr),
203 	     SEND_ATTR_STR(MAIL_ATTR_LOG_CLIENT_PORT, message->client_port),
204 	     SEND_ATTR_STR(MAIL_ATTR_LOG_PROTO_NAME, message->client_proto),
205 	       SEND_ATTR_STR(MAIL_ATTR_LOG_HELO_NAME, message->client_helo),
206     /* XXX Should be encapsulated with ATTR_TYPE_FUNC. */
207 	       SEND_ATTR_STR(MAIL_ATTR_SASL_METHOD, message->sasl_method),
208 	     SEND_ATTR_STR(MAIL_ATTR_SASL_USERNAME, message->sasl_username),
209 	       SEND_ATTR_STR(MAIL_ATTR_SASL_SENDER, message->sasl_sender),
210     /* XXX Ditto if we want to pass TLS certificate info. */
211 	       SEND_ATTR_STR(MAIL_ATTR_LOG_IDENT, message->log_ident),
212 	     SEND_ATTR_STR(MAIL_ATTR_RWR_CONTEXT, message->rewrite_context),
213 	       SEND_ATTR_INT(MAIL_ATTR_RCPT_COUNT, list.len),
214 	       ATTR_TYPE_END);
215     if (sender_buf != 0)
216 	vstring_free(sender_buf);
217     for (recipient = list.info; recipient < list.info + list.len; recipient++)
218 	attr_print(stream, ATTR_FLAG_NONE,
219 		   SEND_ATTR_FUNC(rcpt_print, (const void *) recipient),
220 		   ATTR_TYPE_END);
221     if (vstream_fflush(stream) != 0) {
222 	msg_warn("write to process (%s): %m", entry->queue->transport->name);
223 	return (-1);
224     } else {
225 	if (msg_verbose)
226 	    msg_info("qmgr_deliver: site `%s'", entry->queue->name);
227 	return (0);
228     }
229 }
230 
231 /* qmgr_deliver_abort - transport response watchdog */
232 
qmgr_deliver_abort(int unused_event,void * context)233 static void qmgr_deliver_abort(int unused_event, void *context)
234 {
235     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
236     QMGR_QUEUE *queue = entry->queue;
237     QMGR_TRANSPORT *transport = queue->transport;
238     QMGR_MESSAGE *message = entry->message;
239 
240     msg_fatal("%s: timeout receiving delivery status from transport: %s",
241 	      message->queue_id, transport->name);
242 }
243 
244 /* qmgr_deliver_update - process delivery status report */
245 
qmgr_deliver_update(int unused_event,void * context)246 static void qmgr_deliver_update(int unused_event, void *context)
247 {
248     QMGR_ENTRY *entry = (QMGR_ENTRY *) context;
249     QMGR_QUEUE *queue = entry->queue;
250     QMGR_TRANSPORT *transport = queue->transport;
251     QMGR_MESSAGE *message = entry->message;
252     static DSN_BUF *dsb;
253     int     status;
254 
255     /*
256      * Release the delivery agent from a "hot" queue entry.
257      */
258 #define QMGR_DELIVER_RELEASE_AGENT(entry) do { \
259 	event_disable_readwrite(vstream_fileno(entry->stream)); \
260 	(void) vstream_fclose(entry->stream); \
261 	entry->stream = 0; \
262 	qmgr_deliver_concurrency--; \
263     } while (0)
264 
265     if (dsb == 0)
266 	dsb = dsb_create();
267 
268     /*
269      * The message transport has responded. Stop the watchdog timer.
270      */
271     event_cancel_timer(qmgr_deliver_abort, context);
272 
273     /*
274      * Retrieve the delivery agent status report. The numerical status code
275      * indicates if delivery should be tried again. The reason text is sent
276      * only when a site should be avoided for a while, so that the queue
277      * manager can log why it does not even try to schedule delivery to the
278      * affected recipients.
279      */
280     status = qmgr_deliver_final_reply(entry->stream, dsb);
281 
282     /*
283      * The mail delivery process failed for some reason (although delivery
284      * may have been successful). Back off with this transport type for a
285      * while. Dispose of queue entries for this transport that await
286      * selection (the todo lists). Stay away from queue entries that have
287      * been selected (the busy lists), or we would have dangling pointers.
288      * The queue itself won't go away before we dispose of the current queue
289      * entry.
290      */
291     if (status == DELIVER_STAT_CRASH) {
292 	message->flags |= DELIVER_STAT_DEFER;
293 #if 0
294 	whatsup = concatenate("unknown ", transport->name,
295 			      " mail transport error", (char *) 0);
296 	qmgr_transport_throttle(transport,
297 				DSN_SIMPLE(&dsb->dsn, "4.3.0", whatsup));
298 	myfree(whatsup);
299 #else
300 	qmgr_transport_throttle(transport,
301 				DSN_SIMPLE(&dsb->dsn, "4.3.0",
302 					   "unknown mail transport error"));
303 #endif
304 	msg_warn("transport %s failure -- see a previous warning/fatal/panic logfile record for the problem description",
305 		 transport->name);
306 
307 	/*
308 	 * Assume the worst and write a defer logfile record for each
309 	 * recipient. This omission was already present in the first queue
310 	 * manager implementation of 199703, and was fixed 200511.
311 	 *
312 	 * To avoid the synchronous qmgr_defer_recipient() operation for each
313 	 * recipient of this queue entry, release the delivery process and
314 	 * move the entry back to the todo queue. Let qmgr_defer_transport()
315 	 * log the recipient asynchronously if possible, and get out of here.
316 	 * Note: if asynchronous logging is not possible,
317 	 * qmgr_defer_transport() eventually invokes qmgr_entry_done() and
318 	 * the entry becomes a dangling pointer.
319 	 */
320 	QMGR_DELIVER_RELEASE_AGENT(entry);
321 	qmgr_entry_unselect(entry);
322 	qmgr_defer_transport(transport, &dsb->dsn);
323 	return;
324     }
325 
326     /*
327      * This message must be tried again.
328      *
329      * If we have a problem talking to this site, back off with this site for a
330      * while; dispose of queue entries for this site that await selection
331      * (the todo list); stay away from queue entries that have been selected
332      * (the busy list), or we would have dangling pointers. The queue itself
333      * won't go away before we dispose of the current queue entry.
334      *
335      * XXX Caution: DSN_COPY() will panic on empty status or reason.
336      */
337 #define SUSPENDED	"delivery temporarily suspended: "
338 
339     if (status == DELIVER_STAT_DEFER) {
340 	message->flags |= DELIVER_STAT_DEFER;
341 	if (VSTRING_LEN(dsb->status)) {
342 	    /* Sanitize the DSN status/reason from the delivery agent. */
343 	    if (!dsn_valid(vstring_str(dsb->status)))
344 		vstring_strcpy(dsb->status, "4.0.0");
345 	    if (VSTRING_LEN(dsb->reason) == 0)
346 		vstring_strcpy(dsb->reason, "unknown error");
347 	    vstring_prepend(dsb->reason, SUSPENDED, sizeof(SUSPENDED) - 1);
348 	    if (QMGR_QUEUE_READY(queue)) {
349 		qmgr_queue_throttle(queue, DSN_FROM_DSN_BUF(dsb));
350 		if (QMGR_QUEUE_THROTTLED(queue))
351 		    qmgr_defer_todo(queue, &dsb->dsn);
352 	    }
353 	}
354     }
355 
356     /*
357      * No problems detected. Mark the transport and queue as alive. The queue
358      * itself won't go away before we dispose of the current queue entry.
359      */
360     if (status != DELIVER_STAT_CRASH) {
361 	qmgr_transport_unthrottle(transport);
362 	if (VSTRING_LEN(dsb->reason) == 0)
363 	    qmgr_queue_unthrottle(queue);
364     }
365 
366     /*
367      * Release the delivery process, and give some other queue entry a chance
368      * to be delivered. When all recipients for a message have been tried,
369      * decide what to do next with this message: defer, bounce, delete.
370      */
371     QMGR_DELIVER_RELEASE_AGENT(entry);
372     qmgr_entry_done(entry, QMGR_QUEUE_BUSY);
373 }
374 
375 /* qmgr_deliver - deliver one per-site queue entry */
376 
qmgr_deliver(QMGR_TRANSPORT * transport,VSTREAM * stream)377 void    qmgr_deliver(QMGR_TRANSPORT *transport, VSTREAM *stream)
378 {
379     QMGR_ENTRY *entry;
380     DSN     dsn;
381 
382     /*
383      * Find out if this delivery process is really available. Once elected,
384      * the delivery process is supposed to express its happiness. If there is
385      * a problem, wipe the pending deliveries for this transport. This
386      * routine runs in response to an external event, so it does not run
387      * while some other queue manipulation is happening.
388      */
389     if (stream == 0 || qmgr_deliver_initial_reply(stream) != 0) {
390 #if 0
391 	whatsup = concatenate(transport->name,
392 			      " mail transport unavailable", (char *) 0);
393 	qmgr_transport_throttle(transport,
394 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
395 	myfree(whatsup);
396 #else
397 	qmgr_transport_throttle(transport,
398 				DSN_SIMPLE(&dsn, "4.3.0",
399 					   "mail transport unavailable"));
400 #endif
401 	qmgr_defer_transport(transport, &dsn);
402 	if (stream)
403 	    (void) vstream_fclose(stream);
404 	return;
405     }
406 
407     /*
408      * Find a suitable queue entry. Things may have changed since this
409      * transport was allocated. If no suitable entry is found,
410      * unceremoniously disconnect from the delivery process. The delivery
411      * agent request reading routine is prepared for the queue manager to
412      * change its mind for no apparent reason.
413      */
414     if ((entry = qmgr_job_entry_select(transport)) == 0) {
415 	(void) vstream_fclose(stream);
416 	return;
417     }
418 
419     /*
420      * Send the queue file info and recipient info to the delivery process.
421      * If there is a problem, wipe the pending deliveries for this transport.
422      * This routine runs in response to an external event, so it does not run
423      * while some other queue manipulation is happening.
424      */
425     if (qmgr_deliver_send_request(entry, stream) < 0) {
426 	qmgr_entry_unselect(entry);
427 #if 0
428 	whatsup = concatenate(transport->name,
429 			      " mail transport unavailable", (char *) 0);
430 	qmgr_transport_throttle(transport,
431 				DSN_SIMPLE(&dsn, "4.3.0", whatsup));
432 	myfree(whatsup);
433 #else
434 	qmgr_transport_throttle(transport,
435 				DSN_SIMPLE(&dsn, "4.3.0",
436 					   "mail transport unavailable"));
437 #endif
438 	qmgr_defer_transport(transport, &dsn);
439 	/* warning: entry may be a dangling pointer here */
440 	(void) vstream_fclose(stream);
441 	return;
442     }
443 
444     /*
445      * If we get this far, go wait for the delivery status report.
446      */
447     qmgr_deliver_concurrency++;
448     entry->stream = stream;
449     event_enable_read(vstream_fileno(stream),
450 		      qmgr_deliver_update, (void *) entry);
451 
452     /*
453      * Guard against broken systems.
454      */
455     event_request_timer(qmgr_deliver_abort, (void *) entry, var_daemon_timeout);
456 }
457